C#多线程系列之多阶段并行线程

前言

这一篇,我们将学习用于实现并行任务、使得多个线程有序同步完成多个阶段的任务。

应用场景主要是控制 N 个线程(可随时增加或减少执行的线程),使得多线程在能够在 M 个阶段中保持同步。

线程工作情况如下:

我们接下来 将学习C# 中的 Barrier ,用于实现并行协同工作。

Barrier 类

使多个任务能够采用并行方式依据某种算法在多个阶段中协同工作,使多个线程(称为“参与者” )分阶段同时处理算法。

可以使多个线程(称为“参与者” )分阶段同时处理算法。(注意算法这个词)

每个参与者完成阶段任务后后将被阻止继续执行,直至所有参与者都已达到同一阶段。

Barrier 的构造函数如下:

构造函数 说明
Barrier(Int32) 初始化 Barrier 类的新实例。
Barrier(Int32, Action) 初始化 Barrier 类的新实例。

其中一个构造函数定义如下:

public Barrier (int participantCount, Action<Barrier> postPhaseAction);

participantCount :处于的线程数量,大于0并且小于32767。

postPhaseAction :在每个阶段后执行 Action(委托)。

属性和方法

在还没有清楚这个类有什么作用前,我们来看一下这个类的常用属性和方法。

大概了解 Barrier 有哪些常用属性和方法后,我们开始编写示例代码。

属性:

属性 说明
CurrentPhaseNumber 获取屏障的当前阶段的编号。
ParticipantCount 获取屏障中参与者的总数。
ParticipantsRemaining 获取屏障中尚未在当前阶段发出信号的参与者的数量。

方法:

方法 说明
AddParticipant() 通知 Barrier,告知其将会有另一个参与者。
AddParticipants(Int32) 通知 Barrier,告知其将会有多个其他参与者。
RemoveParticipant() 通知 Barrier,告知其将会减少一个参与者。
RemoveParticipants(Int32) 通知 Barrier,告知其将会减少一些参与者。
SignalAndWait() 发出参与者已达到屏障并等待所有其他参与者也达到屏障。
SignalAndWait(CancellationToken) 发出参与者已达到屏障的信号,并等待所有其他参与者达到屏障,同时观察取消标记。
SignalAndWait(Int32) 发出参与者已达到屏障的信号,并等待所有其他参与者也达到屏障,同时使用 32 位带符号整数测量超时。
SignalAndWait(Int32, CancellationToken) 发出参与者已达到屏障的信号,并等待所有其他参与者也达到屏障,使用 32 位带符号整数测量超时,同时观察取消标记。
SignalAndWait(TimeSpan) 发出参与者已达到屏障的信号,并等待所有其他参与者也达到屏障,同时使用 TimeSpan 对象测量时间间隔。
SignalAndWait(TimeSpan, CancellationToken) 发出参与者已达到屏障的信号,并等待所有其他参与者也达到屏障,使用 TimeSpan 对象测量时间间隔,同时观察取消标记。

Barrier 翻译屏障,前面所说的 “阶段”,在文档中称为屏障,官方有一些例子和实践场景:

https://docs.microsoft.com/zh-cn/dotnet/standard/threading/barrier?view=netcore-3.1

https://docs.microsoft.com/zh-cn/dotnet/standard/threading/how-to-synchronize-concurrent-operations-with-a-barrier?view=netcore-3.1

本文的教程比较简单,你可以先看本教程,再去看看官方示例。

示例

假设有个比赛,一个有三个环节,有三个小组参加比赛。

比赛有三个环节,小组完成一个环节后,可以去等待区休息,等待其他小组也完成比赛后,开始进行下一个环节的比赛。

示例如下:

new Barrier(int,Action) 设置有多少线程参与,Action 委托设置每个阶段完成后执行哪些动作。

.SignalAndWait() 阻止当前线程继续往下执行;直到其他完成也执行到此为止。

    class Program
    {
        // Barrier(Int32, Action)
        private static Barrier barrier = new Barrier(3, b =>
                            Console.WriteLine($"\n第 {b.CurrentPhaseNumber + 1} 环节的比赛结束,请评分!"));

        static void Main(string[] args)
        {
            // Random 模拟每个小组完成一个环节比赛需要的时间
            Thread thread1 = new Thread(() => DoWork("第一小组", new Random().Next(2, 10)));
            Thread thread2 = new Thread(() => DoWork("第二小组", new Random().Next(2, 10)));
            Thread thread3 = new Thread(() => DoWork("第三小组", new Random().Next(2, 10)));

            // 三个小组开始比赛
            thread1.Start();
            thread2.Start();
            thread3.Start();

            Console.ReadKey();
        }
        static void DoWork(string name, int seconds)
        {
            // 第一环节
            Console.WriteLine($"\n{name}:开始进入第一环节比赛");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));    // 模拟小组完成环节比赛需要的时间
            Console.WriteLine($"\n    {name}:完成第一环节比赛,等待其它小组");
            // 小组完成阶段任务,去休息等待其它小组也完成比赛
            barrier.SignalAndWait();

            // 第二环节
            Console.WriteLine($"\n        {name}:开始进入第二环节比赛");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));
            Console.WriteLine($"\n        {name}:完成第二环节比赛,等待其它小组\n");
            barrier.SignalAndWait();

            // 第三环节
            Console.WriteLine($"\n        {name}:开始进入第三环节比赛");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));
            Console.WriteLine($"\n        {name}:完成第三环节比赛,等待其它小组\n");
            barrier.SignalAndWait();
        }
    }

上面的示例中,每个线程都使用了 DoWork() 这个方法去中相同的事情,当然也可以设置多个线程执行不同的任务,但是必须保证每个线程都具有相同数量的 .SignalAndWait(); 方法。

当然 SignalAndWait() 可以设置等待时间,如果其他线程迟迟没有到这一步,那就继续运行。可以避免死锁等问题。

到目前,只使用了 SignalAndWait() ,我们继续学习一下 Barrier 类的其他方法。

新的示例

Barrier.AddParticipant():添加参与者;

Barrier.RemoveParticipant():移除参与者;

这里继续使用第二节的示例。

因为这是比赛,老是等待其他小组,会使得比赛进行比较慢。

新的规则:不必等待最后一名,当环节只剩下最后一名时为完成时,其它小组可以立即进行下一个环节的比赛。

​ 当然,最后一名小组,有权利继续完成比赛。

修改第二小节的代码,在 Main 内第一行加上 barrier.RemoveParticipant();

        static void Main(string[] args)
        {
            barrier.RemoveParticipant();
            ... ...

试着再运行一下。

说明

SignalAndWait() 的 重载比较多,例如 SignalAndWait(CancellationToken),这里笔者先不讲解此方法如何使用。等到写到后面的异步(Task),读者学到相关的知识点,我们再过一次复习,这样由易到难,自然水到渠成。

Barrier 适合用于同时执行相同流程的工作,因为工作内容是相同的,便于协同。工作流有可能用得上吧。

但是 Barrier 更加适合用于算法领域,可以参考:https://devblogs.microsoft.com/pfxteam/parallel-merge-sort-using-barrier/

到此这篇关于C#多线程系列之多阶段并行线程的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • C#多线程系列之线程的创建和生命周期

    目录 1,获取当前线程信息 2,管理线程状态 2.1 启动与参数传递 2.1.1 ParameterizedThreadStart 2.1.2 使用静态变量或类成员变量 2.1.3 委托与Lambda 2.2 暂停与阻塞 2.3 线程状态 2.4 终止 2.5 线程的不确定性 2.6 线程优先级.前台线程和后台线程 2.7 自旋和休眠 1,获取当前线程信息 Thread.CurrentThread 是一个 静态的 Thread 类,Thread 的CurrentThread 属性,可以获取到当前

  • C#多线程系列之线程通知

    AutoRestEvent 类用于从一个线程向另一个线程发送通知. 微软文档是这样介绍的:表示线程同步事件在一个等待线程释放后收到信号时自动重置. 其构造函数只有一个: 构造函数里面的参数用于设置信号状态. 构造函数 说明 AutoResetEvent(Boolean) 用一个指示是否将初始状态设置为终止的布尔值初始化 AutoResetEvent 类的新实例. 真糟糕的机器翻译. 常用方法 AutoRestEvent 类是干嘛的,构造函数的参数又是干嘛的?不着急,我们来先来看看这个类常用的方法

  • C#多线程系列之手动线程通知

    区别与示例 AutoResetEvent 和 ManualResetEvent 十分相似.两者之间的区别,在于前者是自动(Auto),后者是手动(Manua). 你可以先运行下面的示例,再测试两者的区别. AutoResetEvent 示例: class Program { // 线程通知 private static AutoResetEvent resetEvent = new AutoResetEvent(false); static void Main(string[] args) {

  • C#多线程系列之多线程锁lock和Monitor

    目录 1,Lock lock 原型 lock 编写实例 2,Monitor 怎么用呢 解释一下 示例 设置获取锁的时效 1,Lock lock 用于读一个引用类型进行加锁,同一时刻内只有一个线程能够访问此对象.lock 是语法糖,是通过 Monitor 来实现的. Lock 锁定的对象,应该是静态的引用类型(字符串除外). 实际上字符串也可以作为锁的对象使用,只是由于字符串对象的特殊性,可能会造成不同位置的不同线程冲突.如果你能保证字符串的唯一性,例如 Guid 生成的字符串,也是可以作为锁的对

  • C#多线程系列之进程同步Mutex类

    Mutex 中文为互斥,Mutex 类叫做互斥锁.它还可用于进程间同步的同步基元. Mutex 跟 lock 相似,但是 Mutex 支持多个进程.Mutex 大约比 lock 慢 20 倍. 互斥锁(Mutex),用于多线程中防止两条线程同时对一个公共资源进行读写的机制. Windows 操作系统中,Mutex 同步对象有两个状态: signaled:未被任何对象拥有: nonsignaled:被一个线程拥有: Mutex 只能在获得锁的线程中,释放锁. 构造函数和方法 Mutex 类其构造函

  • C#多线程系列之线程完成数

    解决一个问题 假如,程序需要向一个 Web 发送 5 次请求,受网路波动影响,有一定几率请求失败.如果失败了,就需要重试. 示例代码如下: class Program { private static int count = 0; static void Main(string[] args) { for (int i = 0; i < 5; i++) new Thread(HttpRequest).Start(); // 创建线程 // 用于不断向另一个线程发送信号 while (count

  • C#多线程系列之原子操作

    目录 知识点 竞争条件 线程同步 CPU时间片和上下文切换 阻塞 内核模式和用户模式 Interlocked 类 1,出现问题 2,Interlocked.Increment() 3,Interlocked.Exchange() 4,Interlocked.CompareExchange() 5,Interlocked.Add() 6,Interlocked.Read() 知识点 竞争条件 当两个或两个以上的线程访问共享数据,并且尝试同时改变它时,就发生争用的情况.它们所依赖的那部分共享数据,叫

  • C#多线程系列之资源池限制

    Semaphore.SemaphoreSlim 类 两者都可以限制同时访问某一资源或资源池的线程数. 这里先不扯理论,我们从案例入手,通过示例代码,慢慢深入了解. Semaphore 类 这里,先列出 Semaphore 类常用的 API. 其构造函数如下: 构造函数 说明 Semaphore(Int32, Int32) 初始化 Semaphore 类的新实例,并指定初始入口数和最大并发入口数. Semaphore(Int32, Int32, String) 初始化 Semaphore 类的新实

  • C#多线程系列之多阶段并行线程

    前言 这一篇,我们将学习用于实现并行任务.使得多个线程有序同步完成多个阶段的任务. 应用场景主要是控制 N 个线程(可随时增加或减少执行的线程),使得多线程在能够在 M 个阶段中保持同步. 线程工作情况如下: 我们接下来 将学习C# 中的 Barrier ,用于实现并行协同工作. Barrier 类 使多个任务能够采用并行方式依据某种算法在多个阶段中协同工作,使多个线程(称为“参与者” )分阶段同时处理算法. 可以使多个线程(称为“参与者” )分阶段同时处理算法.(注意算法这个词) 每个参与者完

  • C#多线程系列之线程池

    目录 线程池 ThreadPool 常用属性和方法 线程池说明和示例 线程池线程数 线程池线程数说明 不支持的线程池异步委托 任务取消功能 计时器 线程池 线程池全称为托管线程池,线程池受 .NET 通用语言运行时(CLR)管理,线程的生命周期由 CLR 处理,因此我们可以专注于实现任务,而不需要理会线程管理. 线程池的应用场景:任务并行库 (TPL)操作.异步 I/O 完成.计时器回调.注册的等待操作.使用委托的异步方法调用和套接字连接. 很多人不清楚 Task.Task<TResult>

  • C#多线程系列之线程等待

    目录 前言 volatile 关键字 三种常用等待 再说自旋和阻塞 SpinWait 结构 属性和方法 自旋示例 新的实现 SpinLock 结构 属性和方法 示例 等待性能对比 前言 volatile 关键字 volatile 关键字指示一个字段可以由多个同时执行的线程修改. 我们继续使用<C#多线程(3):原子操作>中的示例: static void Main(string[] args) { for (int i = 0; i < 5; i++) { new Thread(AddO

  • C#多线程系列之任务基础(三)

    目录 TaskAwaiter 延续的另一种方法 另一种创建任务的方法 实现一个支持同步和异步任务的类型 Task.FromCanceled() 如何在内部取消任务 Yield 关键字 补充知识点 TaskAwaiter 先说一下 TaskAwaiter,TaskAwaiter 表示等待异步任务完成的对象并为结果提供参数. Task 有个 GetAwaiter() 方法,会返回TaskAwaiter 或TaskAwaiter<TResult>,TaskAwaiter 类型在 System.Run

  • C#多线程系列之任务基础(二)

    目录 判断任务状态 再说父子任务 组合任务/延续任务 复杂的延续任务 并行(异步)处理任务 并行(同步)处理任务 并行任务的 Task.WhenAny 并行任务状态 循环中值变化问题 定时任务 TaskScheduler 类 判断任务状态 属性 说明 IsCanceled 获取此 Task 实例是否由于被取消的原因而已完成执行. IsCompleted 获取一个值,它表示是否已完成任务. IsCompletedSuccessfully 了解任务是否运行到完成. IsFaulted 获取 Task

  • SpringBoot 多任务并行+线程池处理的实现

    前言 前几篇文章着重介绍了后端服务数据库和多线程并行处理优化,并示例了改造前后的伪代码逻辑.当然了,优化是无止境的,前人栽树后人乘凉.作为我们开发者来说,既然站在了巨人的肩膀上,就要写出更加优化的程序. SpringBoot开发案例之JdbcTemplate批量操作 SpringBoot开发案例之CountDownLatch多任务并行处理 改造 理论上讲,线程越多程序可能更快,但是在实际使用中我们需要考虑到线程本身的创建以及销毁的资源消耗,以及保护操作系统本身的目的.我们通常需要将线程限制在一定

随机推荐