.NET Core 中的并发编程

并发编程 - 异步 vs. 多线程代码
并行编程是一个广泛的术语,我们应该通过观察异步方法和实际的多线程之间的差异展开探讨。 尽管 .NET Core 使用了任务来表达同样的概念,一个关键的差异是内部处理的不同。 调用线程在做其他事情时,异步方法在后台运行。这意味着这些方法是 I/O 密集型的,即他们大部分时间用于输入和输出操作,例如文件或网络访问。 只要有可能,使用异步 I/O 方法代替同步操作很有意义。相同的时间,调用线程可以在处理桌面应用程序中的用户交互或处理服务器应用程序中的同时处理其他请求,而不仅仅是等待操作完成。

你可以在我的文章 Asynchronous Programming in C# using Async Await – Best Practices 中阅读更多关于使用 async 和 await 调用异步方法。该文章来自 DNC Magazine (9月刊) 。

计算密集型的方法要求 CPU 周期工作,并且只能运行在他们专用的后台线程中。CPU 的核心数限制了并行运行时的可用线程数量。操作系统负责在剩余的线程之间切换,使他们有机会执行代码。 这些方法仍然被并发地执行,却不必被并行地执行。尽管这意味着方法不是同时执行,却可以在其他方法暂停的时候执行。

并行 vs 并发
本文将在最后一段中重点介绍 在 .NET Core中多线程并发编程。

任务并行库

.NET Framework 4 引入了任务并行库 (TPL) 作为编写并发代码的首选 API。.NET Core采用相同的编程模式。 要在后台运行一段代码,需要将其包装成一个 任务:

1
2
3
var backgroundTask = Task.Run(() => DoComplexCalculation(42));
// do other work
var result = backgroundTask.Result;
当需要返回结果时,Task.Run 方法接收一个 函数 (Func) ;当不需要返回结果时,方法 Task.Run 接收一个 动作 (Action) 。当然,所有的情况下都可以使用 lambda 表达式,就像我上面例子中调用带一个参数的长时间方法。 线程池中的某个线程将会处理任务。.NET Core 的运行时包含一个默认调度程序,使用线程池来处理队列并执行任务。您可以通过派生 TaskScheduler 类实现自己的调度算法,代替默认的,但这超过本文的讨论范围。 正如我们之前所见,我使用 Result 属性来合并被调用的后台线程。对于不需要返回结果的线程,我可以调用 Wait() 来代替。这两种方式都将被堵塞到后台任务完成。 为了避免堵塞调用线程 ( 如在ASP.NET Core应用程序中) ,可以使用 await 关键字:

1
2
3
var backgroundTask = Task.Run(() => DoComplexCalculation(42));
// do other work
var result = await backgroundTask;
这样被调用的线程将被释放以便处理其他传入请求。一旦任务完成,一个可用的工作线程将会继续处理请求。当然,控制器动作方法必须是异步的:

1
public async Task<iactionresult> Index() { // method body }
处理异常
将两个线程合并在一起的时候,任务抛出的任何异常将被传递到调用线程中:

如果使用 Result 或 Wait() ,它们将被打包到 AggregateException 中。实际的异常将被抛出并存储在其 InnerException 属性中。
如果您使用 await,原来的异常将不会被打包。
在这两种情况下,调用堆栈的信息将保持不变。

取消任务
由于任务是可以长时间运行的,所以你可能想要有一个可以提前取消任务的选项。实现这个选项,需要在任务创建的时候传入取消的令牌 (token),之后再使用令牌触发取消任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
var tokenSource = new CancellationTokenSource();
var cancellableTask = Task.Run(() =>
{
for (int i = 0; i < 100; i++)
{
if (tokenSource.Token.IsCancellationRequested)
{
// clean up before exiting
tokenSource.Token.ThrowIfCancellationRequested();
}
// do long-running processing
}
return 42;
}, tokenSource.Token);
// cancel the task
tokenSource.Cancel();
try
{
await cancellableTask;
}
catch (OperationCanceledException e)
{
// handle the exception
} 
实际上,为了提前取消任务,你需要检查任务中的取消令牌,并在需要取消的时候作出反应:在执行必要的清理操作后,调用 ThrowIfCancellationRequested() 退出任务。这个方法将会抛出 OperationCanceledException,以便在调用线程中执行相应的处理。

协调多任务
如果你需要运行多个后台任务,这里有些方法可以帮助到你。 要同时运行多个任务,只需连续启动它们并收集它们的引用,例如在数组中:

1
2
3
4
5
6
var backgroundTasks = new []
{
Task.Run(() => DoComplexCalculation(1)),
Task.Run(() => DoComplexCalculation(2)),
Task.Run(() => DoComplexCalculation(3))
};
现在你可以使用 Task 类的静态方法,等待他们被异步或者同步执行完毕。

1
2
3
4
5
6
// wait synchronously
Task.WaitAny(backgroundTasks);
Task.WaitAll(backgroundTasks);
// wait asynchronously
await Task.WhenAny(backgroundTasks);
await Task.WhenAll(backgroundTasks);
实际上,这两个方法最终都会返回所有任务的自身,可以像任何其他任务一样再次操作。为了获取对应任务的结果,你可以检查该任务的 Result 属性。 处理多任务的异常有点棘手。方法 WaitAll 和 WhenAll 不管哪个任务被收集到异常时都会抛出异常。不过,对于 WaitAll ,将会收集所有的异常到对应的 InnerExceptions 属性;对于 WhenAll ,只会抛出第一个异常。为了确认哪个任务抛出了哪个异常,您需要单独检查每个任务的 Status 和 Exception 属性。 在使用 WaitAny 和 WhenAny 时必须足够小心。他们会等到第一个任务完成 (成功或失败),即使某个任务出现异常时也不会抛出任何异常。他们只会返回已完成任务的索引或者分别返回已完成的任务。你必须等到任务完成或访问其 result 属性时捕获异常,例如:

1
2
3
4
5
6
7
8
9
var completedTask = await Task.WhenAny(backgroundTasks);
try
{
var result = await completedTask;
}
catch (Exception e)
{
// handle exception
}
如果你想连续运行多个任务,代替并发任务,可以使用延续 (continuations)的方式:

1
2
3
var compositeTask = Task.Run(() => DoComplexCalculation(42))
.ContinueWith(previous => DoAnotherComplexCalculation(previous.Result),
TaskContinuationOptions.OnlyOnRanToCompletion)
ContinueWith() 方法允许你把多个任务一个接着一个执行。这个延续的任务将获取到前面任务的结果或状态的引用。 你仍然可以增加条件判断是否执行延续任务,例如只有在前面任务成功执行或者抛出异常时。对比连续等待多个任务,提高了灵活性。 当然,您可以将延续任务与之前讨论的所有功能相结合:异常处理、取消和并行运行任务。这就有了很大的表演空间,以不同的方式进行组合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var multipleTasks = new[]
{
Task.Run(() => DoComplexCalculation(1)),
Task.Run(() => DoComplexCalculation(2)),
Task.Run(() => DoComplexCalculation(3))
};
var combinedTask = Task.WhenAll(multipleTasks);

var successfulContinuation = combinedTask.ContinueWith(task =>
CombineResults(task.Result), TaskContinuationOptions.OnlyOnRanToCompletion);
var failedContinuation = combinedTask.ContinueWith(task =>
HandleError(task.Exception), TaskContinuationOptions.NotOnRanToCompletion);

await Task.WhenAny(successfulContinuation, failedContinuation);
任务同步
如果任务是完全独立的,那么我们刚才看到的协调方法就已足够。然而,一旦需要同时共享数据,为了防止数据损坏,就必须要有额外的同步。 两个以及更多的线程同时更新一个数据结构时,数据很快就会变得不一致。就好像下面这个示例代码一样:

1
2
3
4
5
6
7
8
9
10
var counters = new Dictionary< int, int >();

if (counters.ContainsKey(key))
{
counters[key] ++;
}
else
{
counters[key] = 1;
}
当多个线程同时执行上述代码时,不同线程中的特定顺序执行指令可能导致数据不正确,例如:

所有线程将会检查集合中是否存在同一个 key
结果,他们都会进入 else 分支,并将这个 key 的值设为1
最后结果将会是1,而不是2。如果是接连着执行代码的话,将会是预期的结果。
上述代码中,临界区 (critical section) 一次只允许一个线程可以进入。在C# 中,可以使用 lock 语句来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
var counters = new Dictionary< int, int >();

lock (syncObject)
{
if (counters.ContainsKey(key))
{
counters[key]++;
}
else
{
counters[key] = 1;
}
}
在这个方法中,所有线程都必须共享相同的的 syncObject 。作为最佳做法,syncObject 应该是一个专用的 Object 实例,专门用于保护对一个独立的临界区的访问,避免从外部访问。 在 lock 语句中,只允许一个线程访问里面的代码块。它将阻止下一个尝试访问它的线程,直到前一个线程退出。这将确保线程完整执行临界区代码,而不会被另一个线程中断。当然,这将减少并行性并减慢代码的整体执行速度,因此您最好最小化临界区的数量并使其尽可能的短。

使用 Monitor 类来简化 lock 声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var lockWasTaken = false;
var temp = syncObject;
try
{
Monitor.Enter(temp, ref lockWasTaken);
// lock statement body
}
finally
{
if (lockWasTaken)
{
Monitor.Exit(temp);
}
}
尽管大部分时间您都希望使用 lock 语句,但 Monitor 类可以在需要时给予额外的控制。例如,您可以使用 TryEnter() 而不是 Enter(),并指定一个限定时间,避免无止境地等待锁释放。

其他同步基元
Monitor 只是 .NET Core 中众多同步基元的一员。根据实际情况,其他基元可能更适合。

Mutex 是 Monitor 更重量级的版本,依赖于底层的操作系统,提供跨多个进程同步访问资源[1], 是针对 Mutex 进行同步的推荐替代方案。

SemaphoreSlim 和 Semaphore 可以限制同时访问资源的最大线程数量,而不是像 Monitor 一样只能限制一个线程。 SemaphoreSlim 比 Semaphore 更轻量,但仅限于单个进程。如果可能,您最好使用 SemaphoreSlim 而不是 Semaphore。

ReaderWriterLockSlim 可以区分两种对访问资源的方式。它允许无限数量的读取器 (readers) 同时访问资源,并且限制同时只允许一个写入器 (writers) 访问锁定资源。读取时线程安全,但修改数据时需要独占资源,很好地保护了资源。

AutoResetEvent、ManualResetEvent 和 ManualResetEventSlim 将堵塞传入的线程,直到它们接收到一个信号 (即调用 Set() )。然后等待中的线程将继续执行。AutoResetEvent 在下一次调用 Set() 之前,将一直阻塞,并只允许一个线程继续执行。ManualResetEvent 和 ManualResetEventSlim 不会堵塞线程,除非 Reset() 被调用。ManualResetEventSlim 比前两者更轻量,更值得推荐。

Interlocked 提供一种选择——原子操作,这是替代 locking 和其他同步基元更好的选择(如果适用):

1
2
3
4
5
6
7
// non-atomic operation with a lock
lock (syncObject)
{
counter++;
}
// equivalent atomic operation that doesn't require a lock
Interlocked.Increment(ref counter);
并发集合
当一个临界区需要确保对数据结构的原子访问时,用于并发访问的专用数据结构可能是更好和更有效的替代方案。例如,使用 ConcurrentDictionary 而不是 Dictionary,可以简化 lock 语句示例:

1
2
3
4
5
6
7
var counters = new ConcurrentDictionary< int, int >();

counters.TryAdd(key, 0);
lock (syncObject)
{
counters[key]++;
}
自然地,也有可能像下面一样:

1
counters.AddOrUpdate(key, 1, (oldKey, oldValue) => oldValue + 1);
因为 update 的委托是临界区外面的方法,因此,第二个线程可能在第一个线程更新值之前,读取到同样的旧值,使用自己的值有效地覆盖了第一个线程的更新值,这就丢失了一个增量。错误使用并发集合也是无法避免多线程带来的问题。 并发集合的另一个替代方案是 不变的集合 (immutable collections)。 类似于并发集合,同样是线程安全的,但是底层实现是不一样的。任何关改变数据结构的操作将不会改变原来的实例。相反,它们返回一个更改后的副本,并保持原始实例不变:

1
2
var original = new Dictionary< int, int >().ToImmutableDictionary();
var modified = original.Add(key, value);
因此在一个线程中对集合任何更改对于其他线程来说都是不可见的。因为它们仍然引用原来的未修改的集合,这就是不变的集合本质上是线程安全的原因。 当然,这使得它们对于解决不同集合的问题很有效。最好的情况是多个线程在同一个输入集合的情况下,独立地修改数据,在最后一步可能为所有线程合并变更。而使用常规集合,需要提前为每个线程创建集合的副本。

并行LINQ (PLINQ)
并行LINQ (PLINQ) 是 Task Parallel Library 的替代方案。顾名思义,它很大程度上依赖于 LINQ(语言集成查询)功能。对于在大集合中执行相同的昂贵操作的场景是很有用的。与所有操作都是顺序执行的普通 LINQ to Objects 不同的是,PLINQ可以在多个CPU上并行执行这些操作。 发挥优势所需要的代码改动也是极小的:

1
2
3
4
5
6
7
8
9
10
// sequential execution
var sequential = Enumerable.Range(0, 40)
.Select(n => ExpensiveOperation(n))
.ToArray();

// parallel execution
var parallel = Enumerable.Range(0, 40)
.AsParallel()
.Select(n => ExpensiveOperation(n))
.ToArray();
如你所见,这两个代码片段的不同仅仅是调用 AsParallel()。这将IEnumerable 转换为 ParallelQuery,导致查询的部分并行运行。要切换为回顺序执行,您可以调用 AsSequential(),它将再次返回一个IEnumerable。 默认情况下,PLINQ 不保留集合中的顺序,以便让进程更有效率。但是当顺序很重要时,可以调用 AsOrdered():

1
2
3
4
5
var parallel = Enumerable.Range(0, 40)
.AsParallel()
.AsOrdered()
.Select(n => ExpensiveOperation(n))
.ToArray();
同理,你可以通过调用 AsUnordered() 切换回来。

在完整的 .NET Framework 中并发编程
由于 .NET Core 是完整的 .NET Framework 的简化实现,所以 .NET Framework 中所有并行编程方法也可以在.NET Core 中使用。唯一的例外是不变的集合,它们不是完整的 .NET Framework 的组成部分。它们作为单独的 NuGet 软件包(System.Collections.Immutable)分发,您需要在项目中安装使用。

结论:
每当应用程序包含可以并行运行的 CPU 密集型代码时,利用并发编程来提高性能并提高硬件利用率是很有意义的。 .NET Core 中的 API 抽象了许多细节,使编写并发代码更容易。然而需要注意某些潜在的问题, 其中大部分涉及从多个线程访问共享数据。 如果可以的话,你应该完全避免这种情况。如果不行,请确保选择最合适的同步方法或数据结构。

(0)

相关推荐

  • .net core并发下线程安全问题详解

    抱歉,其实内容并不如题!!! 背景(写测试demo所出现的异常,供大家学习与拍砖): .net core webapi项目,做了一个授权的filter(真正的生产项目的话,JWT很棒),单个接口测试没有问题,当用前端在同一个页面调用多个接口的时候,运行服务,打开页面,然后--Exceptions--(真正的开发中大家应该也会遇到) 异常1:An attempt was made to use the context while it is being configured. A DbContex

  • .net core并发请求发送HttpWebRequest的坑解决

    在framework中,大量并发 HttpWebRequest 需要设置一个最大连接数 ServicePointManager.DefaultConnectionLimit = 200; 但是在.net core中却无效,因为core不使用 ServicePointManager 管理连接数,在core中只有使用HttpClient,HttpCilentFactory来管理连接数,如果在core中使用 ServicePointManager 不但不起作用,并且大量并发使用 HttpWebRequ

  • .net core如何在网络高并发下提高JSON的处理效率详解

    前言 现有的webapi一般都基于JSON的格式来处理数据,由于JSON是一个文本类的序列化协议所以在性能上自然就相对低效一些.在.net中常用Newtonsoft.Json是最常用的组件,由于提供简便基于完整的json的String方法使用起来非常方便:但也正是这原因导致Newtonsoft.Json在性能上一直被说慢,虽然Newtonsoft.Json提供Stream的方式来处理JSON不过想复用writer和reader还是需要一些应用技巧.如果需要在网络通讯中应用JSON,那在这里介绍一

  • asp.net core 系列之并发冲突的深入理解

    本文介绍如何处理多个用户并发更新同一实 体(同时)时出现的冲突 . 主要是两种:一种,检查属性并发冲突,使用 [ConcurrencyCheck] ;另一种,检测行的并发冲突,使用 rowversion 跟踪属性,如果在保存之前有修改,就报错 发生并发冲突的情况: 1.用户导航到实体编辑页面: 2.第一个用户的更改还未写入数据库之前,另一个用户更新同一实体: 此时,如果未启用并发检测,当发生更新时: 最后一个更新优先.即最后一个更新的值保存到数据库.而第一个保存的值将丢失. 举个例子: 1. J

  • .NET Core 中的并发编程

    并发编程 - 异步 vs. 多线程代码 并行编程是一个广泛的术语,我们应该通过观察异步方法和实际的多线程之间的差异展开探讨. 尽管 .NET Core 使用了任务来表达同样的概念,一个关键的差异是内部处理的不同. 调用线程在做其他事情时,异步方法在后台运行.这意味着这些方法是 I/O 密集型的,即他们大部分时间用于输入和输出操作,例如文件或网络访问. 只要有可能,使用异步 I/O 方法代替同步操作很有意义.相同的时间,调用线程可以在处理桌面应用程序中的用户交互或处理服务器应用程序中的同时处理其他

  • Python中的并发编程实例

    一.简介 我们将一个正在运行的程序称为进程.每个进程都有它自己的系统状态,包含内存状态.打开文件列表.追踪指令执行情况的程序指针以及一个保存局部变量的调用栈.通常情况下,一个进程依照一个单序列控制流顺序执行,这个控制流被称为该进程的主线程.在任何给定的时刻,一个程序只做一件事情. 一个程序可以通过Python库函数中的os或subprocess模块创建新进程(例如os.fork()或是subprocess.Popen()).然而,这些被称为子进程的进程却是独立运行的,它们有各自独立的系统状态以及

  • 在.NET Core中使用异步编程的方法步骤

    近期对于异步和多线程编程有些启发,所以我决定把自己的理解写下来. 思考:为什么要使用异步编程? 我们先看看同步方法和异步方法之前在程序中执行的逻辑: 1. 同步方法 static void Main(string[] args) { Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ms")}:开始"); // 调用同步方法 SyncTestMethod(); Console.WriteL

  • Go并发编程实现数据竞争

    目录 1.前言 2.数据竞争 2.1 示例一 2.2 循环中使用goroutine引用临时变量 2.3 引起变量共享 2.4 不受保护的全局变量 2.5 未受保护的成员变量 2.6 接口中存在的数据竞争 3. 总结 4 参考 1.前言 虽然在 go 中,并发编程十分简单, 只需要使用 go func() 就能启动一个 goroutine 去做一些事情,但是正是由于这种简单我们要十分当心,不然很容易出现一些莫名其妙的 bug 或者是你的服务由于不知名的原因就重启了. 而最常见的bug是关于线程安全

  • Python并发编程之未来模块Futures

    目录 区分并发和并行 并发编程之Futures 到底什么是Futures? 为什么多线程每次只有一个线程执行? 总结 不论是哪一种语言,并发编程都是一项非常重要的技巧.比如我们上一章用的爬虫,就被广泛用在工业的各个领域.我们每天在各个网站.App上获取的新闻信息,很大一部分都是通过并发编程版本的爬虫获得的. 正确并合理的使用并发编程,无疑会给我们的程序带来极大性能上的提升.今天我们就一起学习Python中的并发编程——Futures. 区分并发和并行 我们在学习并发编程时,常常会听到两个词:并发

  • Go语言并发编程基础上下文概念详解

    目录 前言 1 Go 中的 Context 2 Context 接口 3 Context Tree 4 创建上下文 4.1 上下文创建函数 4.2 Context 使用规范 4.3 Context 使用场景 5 总结 前言 相信大家以前在做阅读理解的时候,一定有从老师那里学一个技巧或者从参考答案看个:结合上下文.根据上下文我们能够找到有助于解题的相关信息,也能更加了解段落的思想. 在开发过程中,也有这个上下文(Context)的概念,而且上下文也必不可少,缺少上下文,就不能获取完整的程序信息.那

  • 深入分析java并发编程中volatile的实现原理

    引言 在多线程并发编程中synchronized和Volatile都扮演着重要的角色,Volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的"可见性".可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值.它在某些情况下比synchronized的开销更小,本文将深入分析在硬件层面上Inter处理器是如何实现Volatile的,通过深入分析能帮助我们正确的使用Volatile变量. 术语定义 术语 英文单词 描述 共享变量 在多个线

  • Java 并发编程中如何创建线程

    简介 线程是基本的调度单位,它被包含在进程之中,是进程中的实际运作单位,它本身是不会独立存在.一个进程至少有一个线程,进程中的多个线程共享进程的资源. Java中创建线程的方式有多种如继承Thread类.实现Runnable接口.实现Callable接口以及使用线程池的方式,线程池将在后面文章中单独介绍,这里先介绍另外三种方式. 继承Thread类 优点:在run方法里可以用this获取到当前线程. 缺点:由于Java不支持多继承,所以如果继承了Thread类后就不能再继承其他类. public

  • 浅析Java 并发编程中的synchronized

    synchronized关键字,我们一般称之为"同步锁",用它来修饰需要同步的方法和需要同步代码块,默认是当前对象作为锁的对象.在用synchronized修饰类时(或者修饰静态方法),默认是当前类的Class对象作为锁的对象,故存在着方法锁.对象锁.类锁这样的概念. 一.没有设置线程同步的情况 先给出以下代码感受下代码执行的时候为什么需要同步?代码可能比较枯燥,配上业务理解起来就会舒服很多,学生军训,有三列,每列5人,需要报数,每个线程负责每一列报数. class Synchroni

  • Go并发编程中使用channel的方法

    目录 一.设计原理 二.数据结构 三.创建管道 四. 发送数据 4.1 直接发送 4.2 缓冲区 4.3 阻塞发送 4.4 小结 五. 接收数据 5.1 直接接收 5.2 缓冲区 5.3 阻塞接收 六. 关闭channel 七. 使用场景 7.1 使用channel控制子协程 7.2 通过关闭 channel 实现一对多的通知 7.3 使用 channel 做异步编程 7.4 超时控制 7.5 协程池 八. 参考 一.设计原理 Go 语言中最常见的.也是经常被人提及的设计模式就是: "不要通过共

随机推荐