C#并行编程之信号量

一:CountdownEvent

这种采用信号状态的同步基元非常适合在动态的fork,join的场景,它采用“信号计数”的方式,就比如这样,一个麻将桌只能容纳4个人打麻将,如果后来的人也想搓一把碰碰运气,那么他必须等待直到麻将桌上的人走掉一位。好,这就是简单的信号计数机制,从技术角度上来说它是定义了最多能够进入关键代码的线程数。

但是CountdownEvent更牛X之处在于我们可以动态的改变“信号计数”的大小,比如一会儿能够容纳8个线程,一下又4个,一下又10个,这样做有什么好处呢?还是承接上一篇文章所说的,比如一个任务需要加载1w条数据,那么可能出现这种情况。

加载User表:根据user表的数据量,我们需要开5个task。

加载Product表:产品表数据相对比较多,计算之后需要开8个task。

加载order表:由于我的网站订单丰富,计算之后需要开12个task。

先前的文章也说了,我们需要协调task在多阶段加载数据的同步问题,那么如何应对这里的5,8,12,幸好,CountdownEvent给我们提供了可以动态修改的解决方案。

我们看到有两个主要方法:Wait和Signal。每调用一次Signal相当于麻将桌上走了一个人,直到所有人都搓过麻将wait才给放行,这里同样要注意也就是“超时“问题的存在性,尤其是在并行计算中,轻量级别给我们提供了”取消标记“的机制,这是在重量级别中不存在的,比如下面的重载public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken),具体使用可以看前一篇文章的介绍。

//默认的容纳大小为“硬件线程“数
static CountdownEvent cde = new CountdownEvent(Environment.ProcessorCount);

static void Main(string[] args)
{
    //加载User表需要5个任务
    var userTaskCount = 5;

    //重置信号
    cde.Reset(userTaskCount);

    for (int i = 0; i < userTaskCount; i++)
    {
        Task.Factory.StartNew((obj) =>
        {
            LoadUser(obj);
        }, i);
    }

    //等待所有任务执行完毕
    cde.Wait();

    Console.WriteLine("\nUser表数据全部加载完毕!\n");

    //加载product需要8个任务
    var productTaskCount = 8;

    //重置信号
    cde.Reset(productTaskCount);

    for (int i = 0; i < productTaskCount; i++)
    {
        Task.Factory.StartNew((obj) =>
        {
            LoadProduct(obj);
        }, i);
    }

    cde.Wait();

    Console.WriteLine("\nProduct表数据全部加载完毕!\n");

    //加载order需要12个任务
    var orderTaskCount = 12;

    //重置信号
    cde.Reset(orderTaskCount);

    for (int i = 0; i < orderTaskCount; i++)
    {
        Task.Factory.StartNew((obj) =>
        {
            LoadOrder(obj);
        }, i);
    }

    cde.Wait();

    Console.WriteLine("\nOrder表数据全部加载完毕!\n");

    Console.WriteLine("\n(*^__^*) 嘻嘻,恭喜你,数据全部加载完毕\n");

    Console.Read();
}

static void LoadUser(object obj)
{
    try
    {
        Console.WriteLine("当前任务:{0}正在加载User部分数据!", obj);
    }
    finally
    {
        cde.Signal();
    }
}

static void LoadProduct(object obj)
{
    try
    {
        Console.WriteLine("当前任务:{0}正在加载Product部分数据!", obj);
    }
    finally
    {
        cde.Signal();
    }
}

static void LoadOrder(object obj)
{
    try
    {
        Console.WriteLine("当前任务:{0}正在加载Order部分数据!", obj);
    }
    finally
    {
        cde.Signal();
    }
}

二:SemaphoreSlim

在.net 4.0之前,framework中有一个重量级的Semaphore,人家可以跨进程同步,咋轻量级不行,msdn对它的解释为:限制可同时访问某一资源或资源池的线程数。关于它的重量级demo,我的上一个系列有演示,你也可以理解为CountdownEvent是 SemaphoreSlim的功能加强版,好了,举一个轻量级使用的例子。

static SemaphoreSlim slim = new SemaphoreSlim(Environment.ProcessorCount, 12);

static void Main(string[] args)
{
    for (int i = 0; i < 12; i++)
    {
        Task.Factory.StartNew((obj) =>
        {
            Run(obj);
        }, i);
    }

    Console.Read();
}

static void Run(object obj)
{
    slim.Wait();

    Console.WriteLine("当前时间:{0}任务 {1}已经进入。", DateTime.Now, obj);

    //这里busy3s中
    Thread.Sleep(3000);

    slim.Release();
}

同样,防止死锁的情况,我们需要知道”超时和取消标记“的解决方案,像SemaphoreSlim这种定死的”线程请求范围“,其实是降低了扩展性,所以说,试水有风险,使用需谨慎,在觉得有必要的时候使用它。

三: ManualResetEventSlim

相信它的重量级别大家都知道是ManualReset,而这个轻量级别采用的是"自旋等待“+”内核等待“,也就是说先采用”自旋等待的方式“等待,直到另一个任务调用set方法来释放它。如果迟迟等不到释放,那么任务就会进入基于内核的等待,所以说如果我们知道等待的时间比较短,采用轻量级的版本会具有更好的性能,原理大概就这样,下面举个小例子。

//2047:自旋的次数
 static ManualResetEventSlim mrs = new ManualResetEventSlim(false, 2047);

 static void Main(string[] args)
 {

     for (int i = 0; i < 12; i++)
     {
         Task.Factory.StartNew((obj) =>
         {
             Run(obj);
         }, i);
     }

     Console.WriteLine("当前时间:{0}我是主线程{1},你们这些任务都等2s执行吧:\n",
     DateTime.Now,
     Thread.CurrentThread.ManagedThreadId);
     Thread.Sleep(2000);

     mrs.Set();
 }

 static void Run(object obj)
 {
     mrs.Wait();

     Console.WriteLine("当前时间:{0}任务 {1}已经进入。", DateTime.Now, obj);
 }

到此这篇关于C#并行编程之信号量的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 深入了解c# 信号量和互斥体

    一.信号量(Semaphore) 信号量(Semaphore)是由内核对象维护的int变量,当信号量为0时,在信号量上等待的线程会堵塞,信号量大于0时,就解除堵塞.当在一个信号量上等待的线程解除堵塞时,内核自动会将信号量的计数减1.在.net 下通过Semaphore类来实现信号量同步. Semaphore类限制可同时访问某一资源或资源池的线程数.WaitOne method, which is inherited from the WaitHandle class, and release t

  • C#信号量用法简单示例

    本文实例讲述了C#信号量用法.分享给大家供大家参考,具体如下: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; /* * 标题:如何使用信号量的示例代码 * Author:kagula * Date:2015-6-16 * Environment:VS2010SP1, .NET Framework 4 client profi

  • C#并行编程之Task同步机制

    目录 一.隔离执行:不共享数据,让每个task都有一份自己的数据拷贝. 1.传统方式 2.ThreadLocal类 二.同步类型:通过调整task的执行,有序的执行task. 1.Lock锁 2.Interlocked 联锁 3.Mutex互斥体 三.申明性同步 四.并发集合 五.Barrier(屏障同步) 在并行计算中,不可避免的会碰到多个任务共享变量,实例,集合.虽然task自带了两个方法:task.ContinueWith()和Task.Factory.ContinueWhenAll()来

  • C#并行编程之Task任务

    任务,基于线程池.其使我们对并行编程变得更简单,且不用关心底层是怎么实现的.System.Threading.Tasks.Task类是Task Programming Library(TPL)中最核心的一个类. 一.任务与线程 1:任务是架构在线程之上的,也就是说任务最终还是要抛给线程去执行. 2:任务跟线程不是一对一的关系,比如开10个任务并不是说会开10个线程,这一点任务有点类似线程池,但是任务相比线程池有很小的开销和精确的控制. 我们用VS里面的“并行任务”看一看,快捷键Ctrl+D,K,

  • C#并行编程之PLINQ(并行LINQ)

    用于对内存中的数据做并行运算,也就是说其只支持 LINQ to Object 的并行运算 一.AsParallel(并行化) 就是在集合后加个AsParallel(). 例如: var numbers = Enumerable.Range(0, 100); var result = numbers.AsParallel().AsOrdered().Where(i => i % 2 == 0); foreach (var i in result) Console.WriteLine(i); 下面我

  • C#并行编程之数据并行Tasks.Parallel类

    目录 一.并行概念 1.并行编程 2.数据并行 二.Parallel.Invoke():并行调用多个任务 . 三.Parallel.For(): for 循环的并行运算 四.Parallel.ForEach():foreach 循环的并行运算 五.线程局部变量 1.Parallel.For中定义局部变量: 2.Parallel.Each中定义局部变量: 六.Break.Stop中断与停止线程 七.Cancel取消循环 八.Handel Exceptions异常处理 一.并行概念 1.并行编程 在

  • C#并行编程之信号量

    一:CountdownEvent 这种采用信号状态的同步基元非常适合在动态的fork,join的场景,它采用“信号计数”的方式,就比如这样,一个麻将桌只能容纳4个人打麻将,如果后来的人也想搓一把碰碰运气,那么他必须等待直到麻将桌上的人走掉一位.好,这就是简单的信号计数机制,从技术角度上来说它是定义了最多能够进入关键代码的线程数. 但是CountdownEvent更牛X之处在于我们可以动态的改变“信号计数”的大小,比如一会儿能够容纳8个线程,一下又4个,一下又10个,这样做有什么好处呢?还是承接上

  • Guava - 并行编程Futures详解

    Guava为Java并行编程Future提供了很多有用扩展,其主要接口为ListenableFuture,并借助于Futures静态扩展. 继承至Future的ListenableFuture,允许我们添加回调函数在线程运算完成时返回值或者方法执行完成立即返回. 对ListenableFuture添加回调函数: Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor) 其中 FutureCallbac

  • 异步/多线程/任务/并行编程之一:如何选择合适的多线程模型?

    异步.多线程.任务.并行编程之一:选择合适的多线程模型 本篇概述: @FCL4.0中已经存在的线程模型,以及它们之间异同点: @多线程编程模型的选择. 1:异步.多线程.任务.并行的本质 这四个概念对应在CLR中的本质,本质都是多线程. 异步,简单的讲就是BeginInvoke.EndInvoke模式,它在CLR内部线程池进行管理: 多线程,体现在C#中,可以由类型Thread发起.也可以由ThreadPool发起.前者不受CLR线程池管理,后者则是.FCL团队为了各种编程模型的方便,还另外提供

  • 浅谈在Java中使用Callable、Future进行并行编程

    使用Callable.Future进行并行编程 在Java中进行并行编程最常用的方式是继承Thread类或者实现Runnable接口.这两种方式的缺点是在任务完成后无法直接获取执行结果,必须通过共享变量或线程间通信,使用起来很不方便. 从Java1.5开始提供了Callable和Future两个接口,通过使用它们可以在任务执行完毕后得到执行结果. 下面我们来学习下如何使用Callable.Future和FutureTask. Callable接口 Callable接口位于java.util.co

  • 分析Java并发编程之信号量Semaphore

    目录 一.认识Semaphore 1.1.Semaphore 的使用场景 1.2.Semaphore 使用 1.3.Semaphore 信号量的模型 二.Semaphore 深入理解 2.1.Semaphore 基本属性 2.2.Semaphore 的公平性和非公平性 2.3.其他 Semaphore 方法 一.认识Semaphore 1.1.Semaphore 的使用场景 Semaphore 的使用场景主要用于流量控制,比如数据库连接,同时使用的数据库连接会有数量限制,数据库连接不能超过一定的

  • python中的多cpu并行编程

    目录 多cpu并行编程 安装 使用 submit 函数定义 多核cpu并行计算 多cpu并行编程 python多线程只能算并发,因为它智能使用一个cpu内核 python下pp包支持多cpu并行计算 安装 pip install pp 使用 #-*- coding: UTF-8 -*- import math, sys, time import pp def IsPrime(n): """返回n是否是素数""" if not isinstance

  • C#并行编程Task类用法介绍

    Task和ThreadPool的功能类似,可以用来创建一些轻量级的并行任务.对于将一个任务放进线程池 ThreadPool.QueueUserWorkItem(A); 这段代码用Task来实现的话,方式如下: Task.Factory.StartNew(A); 这两端代码的使用和实现的功能都十分相似.但和TheadPool相比,Task有着更多的功能,更加方便我们使用. 假如我们要创建三个任务,并等待它们完成.这个功能用TheadPool实现如下: using (ManualResetEvent

  • Python并行编程多线程锁机制Lock与RLock实现线程同步

    目录 什么是锁机制? Lock() 管理线程 RLock() 与Lock()的区别 什么是锁机制? 要回答这个问题,我们需要知道为什么需要使用锁机制.前面我们谈到一个进程内的多个线程的某些资源是共享的,这也是线程的一大优势,但是也随之带来一个问题,即当两个及两个以上的线程同时访问共享资源时,如果此时没有预设对应的同步机制,就可能带来同一时刻多个线程同时访问同一个共享资源,即出现竞态,多数情况下我们是不希望出现这样的情况的,那么怎么避免呢? Lock() 管理线程 先看一段代码: import t

  • Mapreduce分布式并行编程

    目录 1.什么是并行计算 2.现在mapreduce能做什么? map:映射 reduce:做比较,工作整合,上下游 有些操作放在map.reduce里面都可以 1.project(投射)map完成 2.filter(过滤)map完成 3.key(汇集) 数据SQL: oss和hive的区别? 1. 搭建各类环境 2. 搭建.配置zookeeper 3. 启动zookeeper 4. 安装配置java 5. 主从节点格式化 6. 启动集群 7. 安装Scala 8. 启动spark集群 9. j

随机推荐