C#并行编程之数据并行Tasks.Parallel类

目录
  • 一、并行概念
    • 1、并行编程
    • 2、数据并行
  • 二、Parallel.Invoke():并行调用多个任务 。
  • 三、Parallel.For(): for 循环的并行运算
  • 四、Parallel.ForEach():foreach 循环的并行运算
  • 五、线程局部变量
    • 1、Parallel.For中定义局部变量:
    • 2、Parallel.Each中定义局部变量:
  • 六、Break、Stop中断与停止线程
  • 七、Cancel取消循环
  • 八、Handel Exceptions异常处理

一、并行概念

1、并行编程

在.NET 4中的并行编程是依赖Task Parallel Library(后面简称为TPL) 实现的。在TPL中,最基本的执行单元是task(中文可以理解为"任务"),一个task就代表了你要执行的一个操作。你可以为你所要执行的每一个操作定义一个task,TPL就负责创建线程来执行你所定义的task,并且管理线程。TPL是面向task的,自动的;而传统的多线程是以人工为导向的。

现在已经进入了多核的时代,我们的程序如何更多的利用好硬件cpu,答案是并行处理。在.net4.0之前我们要开发并行的程序是非常的困难,在.net4.0中,在命名空间System.Threading.Tasks提供了方便的并行开发的类库。

2、数据并行

数据并行指的是对源集合或数组的元素同时(即,并行)执行相同操作的场景。 在数据并行操作中,对源集合进行分区,以便多个线程能够同时在不同的网段上操作。

任务并行库 (TPL) 支持通过 System.Threading.Tasks.Parallel 类实现的数据并行。 此类对 for 循环和 foreach 循环提供了基于方法的并行执行。你为Parallel.For 或 Parallel.ForEach 循环编写的循环逻辑与编写连续循环的相似。 无需创建线程或列工作项。 在基本循环中,不需要加锁。TPL 为你处理所有低级别的工作。

Parallel.For()和Parallel.ForEach()方法多次调用同一个方法,而Parallel.Invoke()方法允许同时调用不同的方法。

二、Parallel.Invoke():并行调用多个任务 。

例1:同时调用2个任务

static void Main(string[] args)
{
    var watch = Stopwatch.StartNew();
    Parallel.Invoke(Run1, Run2);
    watch.Stop();
    Console.WriteLine("我是并行开发,总共耗时:{0}", watch.ElapsedMilliseconds)
}

static void Run1()
{
    Console.WriteLine("我是任务一,我跑了3s");
    Thread.Sleep(3000);
}

static void Run2()
{
    Console.WriteLine("我是任务二,我跑了5s");
    Thread.Sleep(5000);
}

例2:说明并不是每个任务一个线程。

// 定义一个线程局部变量,返回其线程名
ThreadLocal<string> ThreadName = new ThreadLocal<string>(() =>
{
    return "Thread" + Thread.CurrentThread.ManagedThreadId;
});

//  打印出当前线程名的方法。
Action action = () =>
{
    // 如果 ThreadName.IsValueCreated 为true,在这个线程上不是第一次运行这个方法。
    bool repeat = ThreadName.IsValueCreated;
    Console.WriteLine("ThreadName = {0} {1}", ThreadName.Value, repeat ? "(repeat)" : "");
};

// 调用8个方法,你应该会看到一些重复的线程名
Parallel.Invoke(action, action, action, action, action, action, action, action);
ThreadName.Dispose();

三、Parallel.For(): for 循环的并行运算

我们知道串行代码中也有一个for,但是那个for并没有用到多核,而Paraller.for它会在底层根据硬件线程的运行状况来充分的使用所有的可利用的硬件线程,注意这里的Parallel.for的步行是1。

在For()方法中,前两个参数定义了循环的开头和结束。示例从0迭代到9。第3个参数是一个 Action<int>委托。整数参数是循环的迭代次数,该参数被传递给Action < int >委托引用的方法。 Parallel.For方法的返回类型是ParallelLoopResult结构,它提供了循环是否结束的信息。

ParallelLoopResult result = Parallel.For(0, 10, i =>
 {
     Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
     Thread.Sleep(10);
 });
Console.WriteLine(result.IsCompleted);

首先先写一个普通的循环:

private void NormalFor()
{
    for (var i = 0; i < 10000; i++)
    {
        for (var j = 0; j < 1000; j++)
        {
            for (var k = 0; k < 100; k++)
            {
                DoSomething();
            }
        }
    }
}

再看一个并行的For语句:

private void ParallelFor()
{
    Parallel.For(0, 10000, i =>
    {
        for (int j = 0; j < 1000; j++)
        {
            for (var k = 0; k < 100; k++)
            {
                DoSomething();
            }
        }

    });
}

上面的例子中,只是将最外层的For语句替换成了Parallel.For,Parallel执行速度可以提高近一倍。

四、Parallel.ForEach():foreach 循环的并行运算

private void NormalForeach()
{
    foreach (var file in GetFiles())
    {
        DoSomething();
    }

}

private void ParallelForeach()
{
    Parallel.ForEach(GetFiles(), file => {
        DoSomething();
    });
}

ForEach的使用跟For使用几乎是差不多了,只是在对非泛型的Collection进行操作的时候,需要通过Cast方法进行转换。

ForEach的独到之处就是可以将数据进行分区,每一个小区内实现串行计算,分区采用Partitioner.Create实现。

for (int j = 1; j < 4; j++)
{
    Console.WriteLine("\n第{0}次比较", j);
    ConcurrentBag<int> bag = new ConcurrentBag<int>();
    var watch = Stopwatch.StartNew();
    watch.Start();
    for (int i = 0; i < 3000000; i++)
    {
        bag.Add(i);
    }
    Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);

    GC.Collect();
    bag = new ConcurrentBag<int>();
    watch = Stopwatch.StartNew();
    watch.Start();
    Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
    {
        for (int m = i.Item1; m < i.Item2; m++)
        {
            bag.Add(m);
        }
    });
    Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
    GC.Collect();
}

五、线程局部变量

下面这段代码多次运行每次的结果都不一样,因为total变量是公共的,而我们的程序是多个线程的加,而多个线程之间是不能把数据共享的。

public void NormalParallelTest()
{
    int[] nums = Enumerable.Range(0, 1000000).ToArray();
    long total = 0;
    Parallel.For(0,nums.Length,i=>
    {
        total += nums[i];
    });
    Console.WriteLine("The total is {0}", total);
}

其实我们需要的是在每个线程中计算出一个和值,然后再进行累加。我们来看看线程局部变量:

泛型方法Parallel.For<T>的原型:

public static ParallelLoopResult
For<TLocal>

(int fromInclusive, int toExclusive,
 Func<TLocal> localInit,
 Func<int, ParallelLoopState, TLocal, TLocal> body,
 Action<TLocal>
localFinally
);
  • TLocal:线程变量的类型;第一个、第二个参数就不必多说了,就是起始值跟结束值。
  • localInit:每个线程的线程局部变量初始值的设置;
  • body:每次循环执行的方法,其中方法的最后一个参数就是线程局部变量;
  • localFinally:每个线程之后执行的方法。

1、Parallel.For中定义局部变量:

从2开始,累加2个,得49.

int[] nums = Enumerable.Range(0, 10).ToArray();
long total = 0;

Parallel.For<long>(0, nums.Length, () => { return 2; }, 

    (j, loop, subtotal) =>//1、每次循环执行的方法
    {
        subtotal += nums[j];
        Console.WriteLine("主体: thread {1}, task {2},结果:{0}", j+ ":" +nums[j] + "-" + subtotal,     Thread.CurrentThread.ManagedThreadId, Task.CurrentId);

        return subtotal;
    },

    (x) =>//2、每个线程执行之后执行的方法
    {

        Console.WriteLine(" 最终执行:thread {1}, task {2},结果:{0} ", x, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
        Interlocked.Add(ref total, x);
    });
Console.WriteLine("The total is {0}", total);

2、Parallel.Each中定义局部变量:

要注意的是,我们必须要使用ForEach<TSource, TLocal>,因为第一个参数表示的是迭代源的类型,第二个表示的是线程局部变量的类型,其方法的参数跟For是差不多的。

public void ForeachThreadLocalTest()
{
    int[] nums = Enumerable.Range(0, 1000000).ToArray();
    long total = 0;
    Parallel.ForEach<int,long>(nums,()=>0,

        (member,loopState,subTotal)=>//1、每次循环执行的方法
        {
            subTotal += member;
            return subTotal;
        },

        (perLocal)=>//2、每个线程执行之后执行的方法
           Interlocked.Add(ref total,perLocal)
       );
    Console.WriteLine("The total is {0}", total);
}

六、Break、Stop中断与停止线程

在并行循环的委托参数中提供了一个ParallelLoopState,该实例提供了Break和Stop方法来帮我们实现。

  • Break“中断”:表示完成当前线程上当前迭代之前的所有线程上的所有迭代,然后退出循环。(比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。)
  • Stop“停止”:表示在方便的情况下尽快停止所有迭代。(比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。)

首先我们可以看到在Parallel.For的一个重载方法:

public static ParallelLoopResult
For
(int fromInclusive, int toExclusive, Action<int, ParallelLoopState > body)

在委托的最后一个参数类型为ParallelLoopState,而ParallelLoopState里面提供给我们两个方法:Break、Stop来终止迭代。

private void StopLoop()
{
    var Stack = new ConcurrentStack<string>();
    Parallel.For(0, 10000, (i, loopState
) =>
    {
        if (i < 1000)
            Stack.Push(i.ToString());
        else
        {
            loopState.Stop();
            return;
        }
    });
    Console.WriteLine("Stop Loop Info:\n elements count:{0}", Stack.Count);
}

七、Cancel取消循环

在并行的循环中支持通过传递ParallelOptions参数中的CancellationToken进行取消循环的控制,我们可以CancellationTokenSource实例化之后传递给ParallelOptions对象Cancellation值。下面来看个示例:

在For循环的实现代码内部,Parallel类验证CancellationToken 的结果,并取消操作。一旦取消操作,For()方法就抛出个OperationCanceledException类型的异常,这是本例捕获的异常。使用 CancellationTokeri可以注册取消操作时的信息。为此,需要调用Register方法,并传递一个在取消 操作时调用的委托。

var cts = new CancellationTokenSource();
cts.Token.Register(() =>Console.WriteLine("*** token canceled"));

// start a task that sends a cancel after 500 ms
new Task(() =>
{
    Thread.Sleep(500);
    cts.Cancel(false);
}).Start();

try
{
    ParallelLoopResult result =
    Parallel.For(0, 100,
    new <strong>ParallelOptions</strong>()
    {
        CancellationToken = cts.Token,
    },
    x =>
    {
        Console.WriteLine("loop {0} started", x);
        int sum = 0;
        for (int i = 0; i < 100; i++)
        {
            Thread.Sleep(2);
            sum += i;
        }
        Console.WriteLine("loop {0} finished", x);
    });
}
catch (OperationCanceledException ex)
{
    Console.WriteLine(ex.Message);
}

八、Handel Exceptions异常处理

在处理并行循环的异常的与顺序循环异常的处理是有所不同的,并行循环里面可能会一个异常在多个循环中出现,或则一个线程上的异常导致另外一个线程上也出现异常。比较好的处理方式就是,首先获取所有的异常最后通过AggregateException来包装所有的循环的异常,循环结束后进行throw。看一段示例代码:

private void HandleNumbers(int[] numbers)
{
    var exceptions = new ConcurrentQueue<Exception>();
    Parallel.For(0, numbers.Length, i =>
    {
        try
        {
            if (numbers[i] > 10 && numbers[i] < 20)
            {
                throw new Exception(String.Format("numbers[{0}] betwewn 10 to 20",i));
            }
        }
        catch (Exception e)
        {
            exceptions.Enqueue(e);
        }
    });

if (exceptions.Count > 0) throw new AggregateException(exceptions); }

测试方法:

public void HandleExceptions()
{
    var numbers = Enumerable.Range(0, 10000).ToArray();
    try
    {
        this.HandleNumbers(numbers);
    }
    catch(AggregateException exceptions)
    {
        foreach (var ex in exceptions.InnerExceptions)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

对上面的方法说明下,在HandleNumbers方法中,就是一个小的demo如果元素的值出现在10-20之间就抛出异常。在上面我们的处理方法就是:在循环时通过队列将所有的异常都集中起来,循环结束后来抛出一个AggregateException。

到此这篇关于C#并行编程之数据并行Tasks.Parallel类的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • C#中Parallel类For、ForEach和Invoke使用介绍

    一.简介: Parallel类提供了数据和任务的并行性:Paraller.For()方法类似于C#的for循环语句,也是多次执行一个任务.使用Paraller.For()方法,可以并行运行迭代,迭代的顺序没有定义.在For()方法中,前两个参数是固定的,这两个参数定义了循环的开头和结束.首先描述它的第一个方法For(int,int,Action<int>),前面两个参数代表循环的开头和介绍,第三个参数是个委托,整数参数是循环的迭代次数,该参数被传递给委托引用的方法.Paraller.For()

  • C#多线程之Parallel类的用法

    Parallel类是对线程的一个抽象.该类位于System.Threading.Tasks名称空间中,提供了数据和任务并行性. Paraller类定义了数据并行地For和ForEach的静态方法,以及任务并行的Invoke的静态方法.Parallel.For()和Parallel.ForEach()方法在每次迭代中调用相同的代码,Paraller.Invoke()允许调用不同的方法. 1.Parallel.For Parallel.For()方法类似C#语法的for循环语句,多次执行一个任务.但

  • c# Parallel类的使用

    Parallel类是对线程的抽象,提供数据与任务的并行性.类定义了静态方法For和ForEach,使用多个任务来完成多个作业.Parallel.For和Parallel.ForEach方法在每次迭代的时候调用相同的代码,而Parallel.Invoke()方法允许同时调用不同的方法.Parallel.ForEach()方法用于数据的并行性,Parallel.Invoke()方法用于任务的并行性. 1.For()方法 For()方法用于多次执行一个任务,可以并行运行迭代,但迭代的顺序并没指定.Fo

  • C#使用Parallel类进行多线程编程实例

    本文实例讲述了C#使用 Parallel 类进行多线程编程的方法.分享给大家供大家参考.具体如下: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Diagnostics; using System.Runtime.InteropServic

  • C#并行编程之数据并行Tasks.Parallel类

    目录 一.并行概念 1.并行编程 2.数据并行 二.Parallel.Invoke():并行调用多个任务 . 三.Parallel.For(): for 循环的并行运算 四.Parallel.ForEach():foreach 循环的并行运算 五.线程局部变量 1.Parallel.For中定义局部变量: 2.Parallel.Each中定义局部变量: 六.Break.Stop中断与停止线程 七.Cancel取消循环 八.Handel Exceptions异常处理 一.并行概念 1.并行编程 在

  • 异步/多线程/任务/并行编程之一:如何选择合适的多线程模型?

    异步.多线程.任务.并行编程之一:选择合适的多线程模型 本篇概述: @FCL4.0中已经存在的线程模型,以及它们之间异同点: @多线程编程模型的选择. 1:异步.多线程.任务.并行的本质 这四个概念对应在CLR中的本质,本质都是多线程. 异步,简单的讲就是BeginInvoke.EndInvoke模式,它在CLR内部线程池进行管理: 多线程,体现在C#中,可以由类型Thread发起.也可以由ThreadPool发起.前者不受CLR线程池管理,后者则是.FCL团队为了各种编程模型的方便,还另外提供

  • 浅谈.Net并行计算之数据并行

    从第一台计算机问世到现在计算机硬件技术已经有了很大的发展.不管是现在个人使用的PC还是公司使用的服务器.双核,四核,八核的CPU已经非常常见.这样我们可以将我们程序分摊到多个计算机CPU中去计算,在过去并行化需要线程的低级操作,难度很大,在.net4.0中的增强了对并行化的支持,使得这一切变得非常简单 .本次我从以下几个方面大家讲以下.NET 并行 1.      数据并行2.      任务并行3.      并行Linq4.      任务工厂5.      注意事项 本次主要给大家讲一下数

  • python中的多cpu并行编程

    目录 多cpu并行编程 安装 使用 submit 函数定义 多核cpu并行计算 多cpu并行编程 python多线程只能算并发,因为它智能使用一个cpu内核 python下pp包支持多cpu并行计算 安装 pip install pp 使用 #-*- coding: UTF-8 -*- import math, sys, time import pp def IsPrime(n): """返回n是否是素数""" if not isinstance

  • C#并行编程之信号量

    一:CountdownEvent 这种采用信号状态的同步基元非常适合在动态的fork,join的场景,它采用“信号计数”的方式,就比如这样,一个麻将桌只能容纳4个人打麻将,如果后来的人也想搓一把碰碰运气,那么他必须等待直到麻将桌上的人走掉一位.好,这就是简单的信号计数机制,从技术角度上来说它是定义了最多能够进入关键代码的线程数. 但是CountdownEvent更牛X之处在于我们可以动态的改变“信号计数”的大小,比如一会儿能够容纳8个线程,一下又4个,一下又10个,这样做有什么好处呢?还是承接上

  • Mapreduce分布式并行编程

    目录 1.什么是并行计算 2.现在mapreduce能做什么? map:映射 reduce:做比较,工作整合,上下游 有些操作放在map.reduce里面都可以 1.project(投射)map完成 2.filter(过滤)map完成 3.key(汇集) 数据SQL: oss和hive的区别? 1. 搭建各类环境 2. 搭建.配置zookeeper 3. 启动zookeeper 4. 安装配置java 5. 主从节点格式化 6. 启动集群 7. 安装Scala 8. 启动spark集群 9. j

  • OpenMP 共享内存的并行编程框架入门详解

    目录 简介 认识 openmp 的简单易用性 C 语言实现 C++ 实现 OpenMP 实现 opnemp 基本原理 积分例子 总结 简介 OpenMP 一个非常易用的共享内存的并行编程框架,它提供了一些非常简单易用的API,让编程人员从复杂的并发编程当中释放出来,专注于具体功能的实现.openmp 主要是通过编译指导语句以及他的动态运行时库实现,在本篇文章当中我们主要介绍 openmp 一些入门的简单指令的使用. 认识 openmp 的简单易用性 比如现在我们有一个任务,启动四个线程打印 he

  • Guava - 并行编程Futures详解

    Guava为Java并行编程Future提供了很多有用扩展,其主要接口为ListenableFuture,并借助于Futures静态扩展. 继承至Future的ListenableFuture,允许我们添加回调函数在线程运算完成时返回值或者方法执行完成立即返回. 对ListenableFuture添加回调函数: Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor) 其中 FutureCallbac

  • 浅谈在Java中使用Callable、Future进行并行编程

    使用Callable.Future进行并行编程 在Java中进行并行编程最常用的方式是继承Thread类或者实现Runnable接口.这两种方式的缺点是在任务完成后无法直接获取执行结果,必须通过共享变量或线程间通信,使用起来很不方便. 从Java1.5开始提供了Callable和Future两个接口,通过使用它们可以在任务执行完毕后得到执行结果. 下面我们来学习下如何使用Callable.Future和FutureTask. Callable接口 Callable接口位于java.util.co

  • C#并行库Parallel类介绍

    Parallel.Invoke 这个函数的功能和Task有些相似,就是并发执行一系列任务,然后等待所有完成.和Task比起来,省略了Task.WaitAll这一步,自然也缺少了Task的相关管理功能.它有两种形式: Parallel.Invoke( params Action[] actions); Parallel.Invoke(Action[] actions,TaskManager manager,TaskCreationOptions options); 第二种方式可以自定义一个Task

随机推荐