C#并行编程之Task同步机制

目录
  • 一、隔离执行:不共享数据,让每个task都有一份自己的数据拷贝。
    • 1、传统方式
    • 2、ThreadLocal类
  • 二、同步类型:通过调整task的执行,有序的执行task。
    • 1、Lock锁
    • 2、Interlocked 联锁
    • 3、Mutex互斥体
  • 三、申明性同步
  • 四、并发集合
  • 五、Barrier(屏障同步)

在并行计算中,不可避免的会碰到多个任务共享变量,实例,集合。虽然task自带了两个方法:task.ContinueWith()和Task.Factory.ContinueWhenAll()来实现任务串行化,但是这些简单的方法远远不能满足我们实际的开发需要,从.net 4.0开始,类库给我们提供了很多的类来帮助我们简化并行计算中复杂的数据同步问题。

一、隔离执行:不共享数据,让每个task都有一份自己的数据拷贝。

对数据共享问题处理的方式是“分离执行”,我们通过把每个Task执行完成后的各自计算的值进行最后的汇总,也就是说多个Task之间不存在数据共享了,各自做各自的事,完全分离开来。

1、传统方式

每个Task执行时不存在数据共享了,每个Task中计算自己值,最后我们汇总每个Task的Result。我们可以通过Task中传递的state参数来进行隔离执行:

int Sum = 0;
Task<int>[] tasks = new Task<int>[10];
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task<int>((obj) =>
    {
        var start = (int)obj;
        for (int j = 0; j < 1000; j++)
        {
            start = start + 1;
        }
        return start;
    }, Sum);
    tasks[i].Start();
}
Task.WaitAll(tasks);
for (var i = 0; i < 10; i++)
{
    Sum += tasks[i].Result;
}
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

2、ThreadLocal类

在.Net中提供了System.Threading.ThreadLocal来创建分离。

ThreadLocal是一种提供线程本地存储的类型,它可以给每个线程一个分离的实例,来提供每个线程单独的数据结果。上面的程序我们可以使用TreadLocal:

int Sum = 0;
Task<int>[] tasks = new Task<int>[10];

var tl = new ThreadLocal<int>();
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task<int>((obj) =>
    {
        tl.Value = (int)obj;
        for (int j = 0; j < 1000; j++)
        {
            tl.Value++;
        }
        returntl.Value;
    }, Sum);
    tasks[i].Start();
}
Task.WaitAll(tasks);
for (var i = 0; i < 10; i++)
{
    Sum += tasks[i].Result;
}
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

但是我们要注意的一点TreadLocal是针对每个线程的,不是针对每个Task的。一个Tread中可能有多个Task。

ThreadLocal类举例:

static ThreadLocal<string> local;
static void Main()
{
    //创建ThreadLocal并提供默认值
    local = new ThreadLocal<string>(() => "hehe");

    //修改TLS的线程
    Thread th = new Thread(() =>
     {
         local.Value = "Mgen";
         Display();
     });

    th.Start();
    th.Join();
    Display();
}

//显示TLS中数据值
static void Display()
{
    Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, local.Value);
}

二、同步类型:通过调整task的执行,有序的执行task

同步类型是一种用来调度Task访问临界区域的一种特殊类型。在.Net 4.0中提供了多种同步类型给我们使用,主要分为:轻量级的、重量级的和等待处理型的,在下面我们会介绍常用的同步处理类型。

常用的同步类型

首先来看看.Net 4.0中常见的几种同步类型以及处理的相关问题:

同步类型以及解决问题

  • lock关键字、Montor类、SpinLock类:有序访问临界区域
  • Interlocked类:数值类型的增加或则减少
  • Mutex类:交叉同步
  • WaitAll方法:同步多个锁定(主要是Task之间的调度)
  • 申明性的同步(如Synchronization):使类中的所有的方法同步

1、Lock锁

其实最简单同步类型的使用办法就是使用lock关键字。在使用lock关键字时,首先我们需要创建一个锁定的object,而且这个object需要所有的task都能访问,其次能我们需要将我们的临界区域包含在lock块中。我们之前例子中代码可以这样加上lock:

int Sum = 0;
Task[] tasks = new Task[10];

var obj = new Object(); for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task(() =>
    {
        for (int j = 0; j < 1000; j++)
        {
            lock (obj)
            { Sum = Sum + 1; }
        }
    });
    tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

其实lock关键字是使用Monitor的一种简短的方式,lock关键字自动通过调用Monitor.Enter\Monitor.Exit方法来处理获得锁以及释放锁。

2、Interlocked 联锁

Interlocked通过使用操作系统或则硬件的一些特性提供了一些列高效的静态的同步方法。其中主要提供了这些方法:Exchange、Add、Increment、CompareExchange四种类型的多个方法的重载。我们将上面的例子中使用Interlocked:

int Sum = 0;
Task[] tasks = new Task[10];
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task(() =>
    {
        for (int j = 0; j < 1000; j++)
        {
            Interlocked.Increment(ref Sum);
        }
    });
    tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

3、Mutex互斥体

Mutex也是一个同步类型,在多个线程进行访问的时候,它只向一个线程授权共享数据的独立访问。我们可以通过Mutex中的WaitOne方法来获取Mutex的所有权,但是同时我们要注意的是,我们在一个线程中多少次调用过WaitOne方法,就需要调用多少次ReleaseMutex方法来释放Mutex的占有。上面的例子我们通过Mutex这样实现:

int Sum = 0;
Task[] tasks = new Task[10];

var mutex = new Mutex();
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task(() =>
    {
        for (int j = 0; j < 1000; j++)
        {
            bool lockAcquired = mutex.WaitOne(); try
            {
                Sum++;
            }
            finally
            {
                if (lockAcquired) mutex.ReleaseMutex();
            }
        }
    });
    tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

三、申明性同步

我们可以通过使用Synchronization 特性来标识一个类,从而使一个类型的字段以及方法都实现同步化。在使用Synchronization 时,我们需要将我们的目标同步的类继承于System.ContextBoundObject类型。我们来看看之前的例子我们同步标识Synchronization 的实现:

static void Main(string[] args)
{
    var sum = new SumClass();
    Task[] tasks = new Task[10];
    for (int i = 0; i < 10; i++)
    {
        tasks[i] = new Task(() =>
        {
            for (int j = 0; j < 1000; j++)
            {
                sum.Increment();
            }
        });
        tasks[i].Start();
    }
    Task.WaitAll(tasks);
    Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, sum.GetSum());
}

[Synchronization]
class SumClass : ContextBoundObject
{
    private int _Sum;

    public void Increment()
    {
        _Sum++;
    }

    public int GetSum()
    {
        return _Sum;
    }
}

四、并发集合

当多个线程对某个非线程安全容器并发地进行读写操作时,这些操作将导致不可预估的后果或者会导致报错。为了解决这个问题我们可以使用lock关键字或者Monitor类来给容器上锁。但锁的引入使得我们的代码更加复杂,同时也带来了更多的同步消耗。而.NET Framework 4提供的线程安全且可拓展的并发集合能够使得我们的并行代码更加容易编写,此外,锁的使用次数的减少也减少了麻烦的死锁与竞争条件的问题。.NET Framework 4主要提供了如下几种并发集合:BlockingCollection,ConcurrentBag,ConcurrentDictionary,ConcurrentQueue,ConcurrentStack。这些集合通过使用一种叫做比较并交换(compare and swap, CAS)指令和内存屏障的技术来避免使用重量级的锁。

在.Net 4.0中提供了很多并发的集合类型来让我们处理数据同步的集合的问题,这里面包括:

1.ConcurrentQueue:提供并发安全的队列集合,以先进先出的方式进行操作;
2.ConcurrentStack:提供并发安全的堆栈集合,以先进后出的方式进行操作;
3.ConcurrentBag:提供并发安全的一种无序集合;
4.ConcurrentDictionary:提供并发安全的一种key-value类型的集合。

我们在这里只做ConcurrentQueue的一个尝试,并发队列是一种线程安全的队列集合,我们可以通过Enqueue()进行排队、TryDequeue()进行出队列操作:

for (var j = 0; j < 10; j++)
{
    var queue = new ConcurrentQueue<int>();
    var count = 0;
    for (var i = 0; i < 1000; i++)
    {
        queue.Enqueue(i);
    }
    var tasks = new Task[10];
    for (var i = 0; i < tasks.Length; i++)
    {
        tasks[i] = new Task(() =>
        {
            while (queue.Count > 0)
            {
                int item;
                var isDequeue = queue.TryDequeue(out item);
                if (isDequeue) Interlocked.Increment(ref count);
            }

        });
        tasks[i].Start();
    }
    try
    {
        Task.WaitAll(tasks);
    }
    catch (AggregateException e)
    {
        e.Handle((ex) =>
        {
            Console.WriteLine("Exception Message:{0}", ex.Message);
            return true;
        });
    }
    Console.WriteLine("Dequeue items count :{0}", count);
}

五、Barrier(屏障同步)

barrier叫做屏障,就像下图中的“红色线”,如果我们的屏障设为4个task就认为已经满了的话,那么执行中先到的task必须等待后到的task,通知方式也就是barrier.SignalAndWait(),屏障中线程设置操作为new Barrier(4,(i)=>{})。SignalAndWait给我们提供了超时的重载,为了能够取消后续执行

//四个task执行
static Task[] tasks = new Task[4];

static Barrier barrier = null;

static void Main(string[] args)
{
    barrier = new Barrier(tasks.Length, (i) =>
     {
         Console.WriteLine("**********************************************************");
         Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);
         Console.WriteLine("**********************************************************");
     });

    for (int j = 0; j < tasks.Length; j++)
    {
        tasks[j] = Task.Factory.StartNew((obj) =>
        {
            var single = Convert.ToInt32(obj);

            LoadUser(single);
            barrier.SignalAndWait();

            LoadProduct(single);
            barrier.SignalAndWait();

            LoadOrder(single);
            barrier.SignalAndWait();
        }, j);
    }

    Task.WaitAll(tasks);

    Console.WriteLine("指定数据库中所有数据已经加载完毕!");

    Console.Read();
}

static void LoadUser(int num)
{
    Console.WriteLine("当前任务:{0}正在加载User部分数据!", num);
}

static void LoadProduct(int num)
{
    Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
}

static void LoadOrder(int num)
{
    Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
}

到此这篇关于C#并行编程之Task同步机制的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • C#中的Task.WaitAll和Task.WaitAny方法介绍

    一.简介 Task.WaitAll:等待所有提供的 Task 对象完成执行过程. Task.WaitAny:等待提供的任一 Task 对象完成执行过程. 二.代码案例 Task.WaitAll 代码: class Program { public class DownLoadTest { Stopwatch watch = new Stopwatch(); public DownLoadTest() { watch.Start(); } public async Task DoRunTaskAs

  • C#并行编程之数据并行Tasks.Parallel类

    目录 一.并行概念 1.并行编程 2.数据并行 二.Parallel.Invoke():并行调用多个任务 . 三.Parallel.For(): for 循环的并行运算 四.Parallel.ForEach():foreach 循环的并行运算 五.线程局部变量 1.Parallel.For中定义局部变量: 2.Parallel.Each中定义局部变量: 六.Break.Stop中断与停止线程 七.Cancel取消循环 八.Handel Exceptions异常处理 一.并行概念 1.并行编程 在

  • C#使用Task.ContinueWith组合任务

    代码案例 简单Demo 代码: public static void Main() { //创建一个任务 Task<int> task = new Task<int>(() => { int sum = 0; Console.WriteLine("使用Task異步執行操作."); for (int i = 0; i <= 100; i++) { sum += i; } return sum; }); //启动任务,并安排到当前任务队列线程中执行任务(

  • C#并行编程之Task任务

    任务,基于线程池.其使我们对并行编程变得更简单,且不用关心底层是怎么实现的.System.Threading.Tasks.Task类是Task Programming Library(TPL)中最核心的一个类. 一.任务与线程 1:任务是架构在线程之上的,也就是说任务最终还是要抛给线程去执行. 2:任务跟线程不是一对一的关系,比如开10个任务并不是说会开10个线程,这一点任务有点类似线程池,但是任务相比线程池有很小的开销和精确的控制. 我们用VS里面的“并行任务”看一看,快捷键Ctrl+D,K,

  • C#异步编程Task的创建方式

    一.简介 ThreadPool相比Thread来说具备了很多优势,但是ThreadPool却又存在一些使用上的不方便.比如:Task支持线程的取消.完成.失败通知等交互性操作,但是ThreadPool不支持:Task支持线程执行的先后次序,但是ThreadPool不支持::以往,如果开发者要实现上述功能,需要完成很多额外的工作,现在,FCL中提供了一个功能更强大的概念:Task.Task在线程池的基础上进行了优化,并提供了更多的API.在FCL4.0中,如果我们要编写多线程程序,Task显然已经

  • C#多线程编程Task用法详解

    目录 一.基本概念 Task优势 二.Task用法 创建任务 1.使用Task创建无返回值 2.使用Task.Run方法创建任务 3.使用Factory方式创建任务 4.创建带返回值的Task 三.常见方法 1.WaitAll() 2.WaitAny() 3.ContinueWhenAll() 4.ContinueWhenAny 5.ContinueWith 一.基本概念 Task优势 ThreadPool相比Thread来说具备了很多优势,但是ThreadPool却又存在一些使用上的不方便,例

  • C#连续任务Task.ContinueWith方法

    一.简介 通过任务,可以指定在任务完成之后,应开始运行之后另一个特定任务.ContinueWith是Task根据其自身状况,决定后续应该作何操作.也就是说,在运行完task后,会执行task.continuewith(XX)中的XX语句,但是是否执行.如何执行等需要看task的运行情况.例如:一个使用前一个任务的结果的新任务,如果前一个任务失败了,这个任务就应执行一些清理工作.任务处理程序都不带参数或者带一个对象参数,而任务的连续处理方法都有一个Task类型的参数. 二.案例 案例一: 代码:

  • C#中的Task.WhenAll和Task.WhenAny方法介绍

    一.简介 Task.WhenAll().Task.WhenAny()这两个与Task.WaitALL().Task.WaitAny()是有区别的,When是异步的,Wait是同步的.Task.WhenAll():所有提供的任务已完成时,创建将完成的任务.Task.WhenAny():任何提供的任务已完成时,创建将完成的任务. 二.代码案例 Task.WhenAll 代码: class Program { public class DownLoadTest { Stopwatch watch =

  • C#并行编程之Task同步机制

    目录 一.隔离执行:不共享数据,让每个task都有一份自己的数据拷贝. 1.传统方式 2.ThreadLocal类 二.同步类型:通过调整task的执行,有序的执行task. 1.Lock锁 2.Interlocked 联锁 3.Mutex互斥体 三.申明性同步 四.并发集合 五.Barrier(屏障同步) 在并行计算中,不可避免的会碰到多个任务共享变量,实例,集合.虽然task自带了两个方法:task.ContinueWith()和Task.Factory.ContinueWhenAll()来

  • C#并行编程之PLINQ(并行LINQ)

    用于对内存中的数据做并行运算,也就是说其只支持 LINQ to Object 的并行运算 一.AsParallel(并行化) 就是在集合后加个AsParallel(). 例如: var numbers = Enumerable.Range(0, 100); var result = numbers.AsParallel().AsOrdered().Where(i => i % 2 == 0); foreach (var i in result) Console.WriteLine(i); 下面我

  • Java多线程编程之CountDownLatch同步工具使用实例

    好像倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当到达0时,所有等待者就开始执行. java.util.concurrent.CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待.用给定的计数初始化CountDownLatch.由于调用了countDown()方法,所以在当前计数到达零之前,await方法会一直受阻塞.之后,会释放所有等待的线程,await的所有后续调用都将立即返回.这种现

  • asyncio异步编程之Task对象详解

    目录 1.Task对象的作用 2.如何创建task对象 3.示例一(目前不推荐这种写法) 4.示例2 5.示例3(算是以上示例2的简化版) 总结 1.Task对象的作用 可以将多个任务添加到事件循环当中,达到多任务并发的效果 2.如何创建task对象 asyncio.create_task(协程对象) 注意:create_task只有在python3.7及以后的版本中才可以使用,就像asyncio.run()一样, 在3.7以前可以使用asyncio.ensure_future()方式创建tas

  • Java并发编程之JUC并发核心AQS同步队列原理剖析

    目录 一.AQS介绍 二.AQS中的队列 1.同步等待队列 2.条件等待队列 3.AQS队列节点Node 三.同步队列源码分析 1.同步队列分析 2.同步队列--独占模式源码分析 3.同步队列--共享模式源码分析 一.AQS介绍 队列同步器AbstractQueuedSynchronizer(简称AQS),AQS定义了一套多线程访问共享资源的同步器框架,是用来构建锁或者其他同步组件的基础框架,是一个依赖状态(state)的同步器.Java并发编程的核心在java.util.concurrent(

  • Android编程之Sdcard相关代码集锦

    本文实例讲述了Android编程之Sdcard相关代码.分享给大家供大家参考,具体如下: 1. 检测Sdcard是否可用: public static boolean sdCardIsAvailable() { String status = Environment.getExternalStorageState(); if (!status.equals(Environment.MEDIA_MOUNTED)) { return false; } return true; } 2. 获得程序在s

  • 对Python协程之异步同步的区别详解

    一下代码通过协程.多线程.多进程的方式,运行代码展示异步与同步的区别. import gevent import threading import multiprocessing # 这里展示同步和异步的性能区别,可以看到异步直接同时执行并完成, # 而同步,需要等待第一个完成后再次执行下一个,是有顺序的执行,而异步不需要 import time def task(pid): gevent.sleep(0.5) print('Task %s done' % pid) def task2(pid)

  • Java并发编程之Condition源码分析(推荐)

    Condition介绍 上篇文章讲了ReentrantLock的加锁和释放锁的使用,这篇文章是对ReentrantLock的补充.ReentrantLock#newCondition()可以创建Condition,在ReentrantLock加锁过程中可以利用Condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活".Condition常用在基于异步通信的同步机制实现中,比如dubbo中的请求和获取应答结果的实现. 常用方法 Condition中主要的

  • 深入学习C#网络编程之HTTP应用编程(下)

    第三篇来的好晚啊,上一篇说了如何向服务器推送信息,这一篇我们看看如何"快好准"的从服务器下拉信息. 网络上有很多大资源文件,比如供人下载的zip包,电影(你懂的),那么我们如何快速的进行下载,大家第一反应肯定就是多线程下载, 那么这些东西是如何做的呢?首先我们可以从"QQ的中转站里面拉一个rar下来". 然后用fiddler监视一下,我们会发现一个有趣的现象: 第一:7.62*1024*1024≈7990914  千真万确是此文件 第二:我明明是一个http链接,t

随机推荐