C#实现控制线程池最大数并发线程

1. 实验目的:

使用线程池的时候,有时候需要考虑服务器的最大线程数目和程序最快执行所有业务逻辑的取舍。
并非逻辑线程越多也好,而且新的逻辑线程必须会在线程池的等待队列中等待 ,直到线程池中工作的线程执行完毕,
才会有系统线程取出等待队列中的逻辑线程,进行CPU运算。

2.  解决问题:

<a>如果不考虑服务器实际可支持的最大并行线程个数,程序不停往线程池申请新的逻辑线程,这个时候我们可以发现CPU的使用率会不断飙升,并且内存、网络带宽占用也会随着逻辑线程在CPU队列中堆积,而不断增大。

<b>如果我们想在主程序有200个http网络通讯需要执行,如何每次循环用10个线程并发处理10个网络http通讯回话,下一次循环只有在上一次循环的10个线程都执行完毕后才会执行下一次循环,并且主程序监听和等待200个http网络通讯都在CPU线程池中执行完毕后,才会退出主程序。

3.  实现逻辑:

我们通过两个AutoResetEvent和线程监听器Monitor,分别实现:

<a>wait_sync:   任务线程的 并发执行,每次循环只处理最大10个线程分别对网络做http通讯回话。并且当前循环的10个线程都执行完毕后,才会进行下一次循环处理。
       <b> wait_main: 主程序线程的监听和等待,只有所有任务线程都执行完毕后,主程序线程才会退出程序。
       <c> list_Thread: 负责记录每次循环,CPU实际分配的系统线程的个数。和Monitor配合使用,Monitor.Enter(list_Thread)=占用共享线程资源的占用锁,Monitor.Exit(list_Thread)释放共享线程资源的占用锁。  
       <d> n_total_thread: 配合wait_main使用,记录全部逻辑线程,已经执行完毕的当前总个数,用来判断主线程是否还需要继续等待,还是可以结束主程序运行。

4. 主要代码:

<a> 线程池控制代码,如下:

/// <summary>
/// 多线程调用WCF
/// </summary>
/// <param name="select">调用WCF的方式,1=Restful,2=Tcp</param>
/// <param name="num"></param>
static void DoTest_MultiThread(string select, long num)
{
  int n_max_thread = 10; // 设置并行最大为10个线程
  int n_total_thread = 0; // 用来控制:主程序的结束执行,当所有任务线程执行完毕

  ILog log_add = new LogHelper("Add_Thread");
  ILog log_del = new LogHelper("Del_Thread");
  ILog log_wait = new LogHelper("Wait_Thread");
  ILog log_set = new LogHelper("Set_Thread");
  ILog log_for = new LogHelper("For_Thread");

  Console.Title = string.Format("调用WCF的方式 => {0}, 调用次数=> {1}"
    , select == "1" ? "Restful" : "Socket"
    , num);

  List<int> list_Thread = new List<int>();

  System.Threading.AutoResetEvent wait_sync = new System.Threading.AutoResetEvent(false); // 用来控制:并发最大个数线程=n_max_thread
  System.Threading.AutoResetEvent wait_main = new System.Threading.AutoResetEvent(false); // 用来控制:主程序的结束执行,当所有任务线程执行完毕

  DateTime date_step = DateTime.Now;
  for (long i = 0; i < num; i++)
  {
    Num_Query_Static++;
    if (i >0 && (i+1-1) % n_max_thread == 0) // -1 表示第max个线程尚未开始
    {
      //log_wait.Info(string.Format("thread n= {0},for i= {1}", dic_Thread.Count, i + 1));
      wait_sync.WaitOne(); // 每次并发10个线程,等待处理完毕后,在发送下一次并发线程
    }
    log_for.Info(string.Format("thread n= {0},for i= {1}", list_Thread.Count, i + 1));

    System.Threading.ThreadPool.QueueUserWorkItem
      ((data) =>
      {
        int id = System.Threading.Thread.CurrentThread.ManagedThreadId;
        System.Threading.Monitor.Enter(list_Thread);
        list_Thread.Add(id);
        System.Threading.Monitor.Exit(list_Thread);

        log_add.Info(string.Format("id={0}, count={1}", id, list_Thread.Count)); // 日志

        if (select == "1") // Restful方式调用
        {
          Query_Htty();
        }
        else
        {
          Query_Socket();
        }

        n_total_thread += 1;
        if (list_Thread.Count == (n_max_thread) || n_total_thread == num)
        {
          list_Thread.Clear();
          //log_set.Info(string.Format("thread n= {0},for i= {1}", dic_Thread.Count, i + 1));
          //wait_sync.Set();
          if (n_total_thread != num)
          {
            wait_sync.Set(); // 任务线程,继续执行
          }
          else
          {
            wait_main.Set(); // 主程序线程,继续执行
          }
        }
      }, list_Thread);
  }

  wait_main.WaitOne();

  Console.WriteLine(string.Format("总测试{0}次,总耗时{1}, 平均耗时{2}"
    , num
    , (DateTime.Now - date_step).ToString()
    , (DateTime.Now - date_step).TotalMilliseconds / num));

  Query_Thread();
}

 <b> WCF后台服务代码

private static ILog log = new LogHelper("SeqService"); // 日志
private static Dictionary<int, DateTime> dic_thread = new Dictionary<int, DateTime>(); // 线程列表

private static long Num = 0; // 线程个数
private static object lock_Num = 0; // 共享数据-锁

/// <summary>
/// 在线申请流水号
/// </summary>
/// <returns></returns>
[WebGet(UriTemplate = "GetSeqNum/Json", ResponseFormat = WebMessageFormat.Json)]
public string GetSeqNumber()
{
  lock (lock_Num)
  {
    Num++;
    int id_thread = System.Threading.Thread.CurrentThread.ManagedThreadId;
    DateTime now = DateTime.Now;
    if (!dic_thread.TryGetValue(id_thread, out now))
    {
      dic_thread.Add(id_thread, DateTime.Now);
    }

  }
  string ret = DateTime.Now.ToString("yyyyMMdd") + Num.ToString(new string('0', 9)); 

  log.Info(string.Format("{0}, Thread={1}/{2}", ret, System.Threading.Thread.CurrentThread.ManagedThreadId, dic_thread.Count));
  return ret;
}
  

5.  实验结果

1. 10000个WCF网络http请求,CPU分成每次10个(10可以按需求调整)线程并发执行,并且主程序在所有请求都执行完毕后,才退出主程序。

1. 前端日志:LogFile\Add_Thread\Info

2. WCF日志:LogFile\SeqService\Info

(0)

相关推荐

  • C#实现的Win32控制台线程计时器功能示例

    本文实例讲述了C#实现的Win32控制台线程计时器功能.分享给大家供大家参考,具体如下: 在C#中提供了三种类型的计时器: 1.基于 Windows 的标准计时器(System.Windows.Forms.Timer) 2.基于服务器的计时器(System.Timers.Timer) 3.线程计时器(System.Threading.Timer) 一.基于 Windows 的标准计时器(System.Windows.Forms.Timer) 首先注意一点就是:Windows 计时器是为单线程环境

  • C#线程执行超时处理与并发线程数控制实例

    本文实例讲述了C#线程执行超时处理与并发线程数控制的方法.分享给大家供大家参考.具体实现方法如下: 特别说明: 1.为了测试方便,这里对存储过程的执行是模拟的 2.这里限制了并发执行存储过程的最大个数,但并没有对并发线程数进行控制,与文章标题略有不符,但程序稍做改动即可控制并发线程数 代码如下: 复制代码 代码如下: using System; using System.Collections.Generic; using System.ComponentModel; using System.

  • C#多线程之线程控制详解

    本文为大家分享了C#多线程之线程控制,供大家参考,具体内容如下 方案一: 调用线程控制方法.启动:Thread.Start();停止:Thread.Abort();暂停:Thread.Suspend();继续:Thread.Resume(); private void btn_Start_Click(object sender, EventArgs e) { mThread.Start(); // 开始 } private void btn_Stop_Click(object sender, E

  • C#通过Semaphore类控制线程队列的方法

    本文实例讲述了C#通过Semaphore类控制线程队列的方法.分享给大家供大家参考.具体实现方法如下: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Diagnostics; using System.Threading; using System.ComponentModel; using System.Col

  • C#控制台下测试多线程的方法

    本文实例讲述了C#控制台下多线程实现方法.分享给大家供大家参考.具体如下: class Program { static void Main(string[] args) { ThreadStart num = new ThreadStart(PrintNum); Thread ConstrolNum = new Thread(num); ThreadStart str = new ThreadStart(PrintStr); Thread ConstrolStr = new Thread(st

  • C#实现控制线程池最大数并发线程

    1. 实验目的: 使用线程池的时候,有时候需要考虑服务器的最大线程数目和程序最快执行所有业务逻辑的取舍. 并非逻辑线程越多也好,而且新的逻辑线程必须会在线程池的等待队列中等待 ,直到线程池中工作的线程执行完毕, 才会有系统线程取出等待队列中的逻辑线程,进行CPU运算. 2.  解决问题: <a>如果不考虑服务器实际可支持的最大并行线程个数,程序不停往线程池申请新的逻辑线程,这个时候我们可以发现CPU的使用率会不断飙升,并且内存.网络带宽占用也会随着逻辑线程在CPU队列中堆积,而不断增大. &l

  • java线程池:获取运行线程数并控制线程启动速度的方法

    在java里, 我们可以使用Executors.newFixedThreadPool 来创建线程池, 然后就可以不停的创建新任务,并用线程池来执行了. 在提交任务时,如果线程池已经被占满,任务会进到一个队列里等待执行. 这种机制在一些特定情况下会有些问题.今天我就遇到一种情况:创建线程比线程执行的速度要快的多,而且单个线程占用的内存又多,所以很快内存就爆了. 想了一个办法,就是在提交任务之前,先检查目前正在执行的线程数目,只有没把线程池占满的时候在去提交任务. 代码很简单: int thread

  • springmvc配置线程池Executor做多线程并发操作的代码实例

    加载xml文件 在ApplicationContext.xml文件里面添加 xmlns:task="http://www.springframework.org/schema/task" xmlns文件并且xsi:schemaLocation中添加 http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd 在spring中配置Executor

  • 一种类似JAVA线程池的C++线程池实现方法

    什么是线程池 线程池(thread pool)是一种线程使用模式.线程过多或者频繁创建和销毁线程会带来调度开销,进而影响缓存局部性和整体性能.而线程池维护着多个线程,等待着管理器分配可并发执行的任务.这避免了在处理短时间任务时创建与销毁线程的代价,以及保证了线程的可复用性.线程池不仅能够保证内核的充分利用,还能防止过分调度. 线程池的实现 线程池在JAVA平台上已经有成熟的实现方式,本文介绍参考JAVA线程池实现方式实现的C++线程池类库. 该类库代码已上传至github仓库中,下载地址为:ht

  • JAVA 自定义线程池的最大线程数设置方法

    一:CPU密集型: 定义:CPU密集型也是指计算密集型,大部分时间用来做计算逻辑判断等CPU动作的程序称为CPU密集型任务.该类型的任务需要进行大量的计算,主要消耗CPU资源.  这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数. 特点:    01:CPU 使用率较高(也就是经常计算一些复杂的运算,逻辑处理等情况)非常多的情况下使用    02:针对单台机

  • 线上dubbo线程池耗尽CyclicBarrier线程屏障异常解决记录

    目录 事件背景 问题定位 解决问题 文末结语 事件背景 系统相关使用人员反馈系统故障,日志显示从ams系统服务提示dubbo处理线程不足,具体异常信息如下: 问题定位 从上图可知,dubbo的处理线程池满了,默认200个线程,活动线程也是200个.这个现象非常不正常,我们的应用并发还没有到这个程度能同时占用200个线程处理请求.然后去读了下dubbo源码,发现dubbo也认为这种情况不正常,然后帮我们记录了应用的线程堆栈信息,这个非常赞.代码如下: 上面这段代码,在线程池不够用时,会每隔十分钟输

  • java线程池中Worker线程执行流程原理解析

    目录 引言 Worker类分析 runWorker(Worker)方法 getTask()方法 beforeExecute(Thread, Runnable)方法 afterExecute(Runnable, Throwable)方法 processWorkerExit(Worker, boolean)方法 tryTerminate()方法 terminated()方法 引言 在<[高并发]别闹了,这样理解线程池执行任务的核心流程才正确!!>一文中我们深度分析了线程池执行任务的核心流程,在Th

  • JDK线程池和Spring线程池的使用实例解析

    这篇文章主要介绍了JDK线程池和Spring线程池的使用实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 JDK线程池和Spring线程池实例,异步调用,可以直接使用 (1)JDK线程池的使用,此处采用单例的方式提供,见示例: public class ThreadPoolUtil { private static int corePoolSize = 5; private static int maximumPoolSize = 10;

  • java ThreadPool线程池的使用,线程池工具类用法说明

    实际上java已经提供线程池的实现 ExecutorService. 为了更方便的使用和管理.这里提供一个线程池工具类,方便大家的使用. 直接看看代码: 使用 public static void main(String[] args) { //实例化一个固定数目的线程池.具体参考类的构造方法 ThreadPool threadPool=new ThreadPool(ThreadPool.FixedThread,5); //线程池执行线程 threadPool.execute(new Runna

  • 详解Java线程池如何统计线程空闲时间

    背景介绍 你刚从学校毕业后,到新公司实习,试用期又被毕业,然后你又不得不出来面试,好在面试的时候碰到个美女面试官! 面试官: 小伙子,我看你简历上写的项目中用到了线程池,你知道线程池是怎样实现复用线程的? 这面试官是不是想坑我?是不是摆明了不让我通过? 难道你不应该问线程池有哪些核心参数?每个参数具体作用是什么? 往线程池中不断提交任务,线程池的处理流程是什么? 这些才是你应该问的,这些八股文我已经背熟了,你不问,瞎问什么复用线程? 幸亏我看了一灯的八股文,听我给你背一遍! 我: 线程池复用线程

随机推荐