C#的并发机制优秀在哪你知道么

目录
  • 一行没用的代码却提高了效率?
  • 看似没用的Invoke到底有什么用
  • ​深度解读,为何要加两把锁
  • 总结

笔者上次用C#写.Net代码差不多还是10多年以前,由于当时Java已经颇具王者风范,Net几乎被打得溃不成军。因此当时笔者对于这个.Net的项目态度比较敷衍了事,没有对其中一些优秀机制有很深的了解,在去年写《C和Java没那么香了,高并发时代谁能称王》时都没给.Net以一席之地,不过最近恰好机缘巧合,我又接手了一个Windows方面的项目,这也让我有机会重新审视一下自己关于.Net框架的相关知识。

项目原型要实现的功能并不复杂,主要就是记录移动存储设备中文件拷出的记录,而且需要尽可能少的占用系统资源,而在开发过程中我无意中加了一行看似没有任何效果的代码,使用Invoke方法记录文件拷出情况,这样的操作却让程序执行效率明显会更高,这背后的原因特别值得总结。

一行没用的代码却提高了效率?

由于我需要记录的文件拷出信息并没有回显在UI的需要,因此也就没考虑并发冲突的问题,在最初版本的实现中,我对于filesystemwatcher的回调事件,都是直接处理的,如下:

private void DeleteFileHandler(object sender, FileSystemEventArgs e)
        {
            if(files.Contains(e.FullPath))
            {
                files.Remove(e.FullPath);
               //一些其它操作
            }
        }

这个程序的处理效率在普通的办公PC上如果同时拷出20个文件,那么在拷贝过程中,U盘监测程序的CPU使用率大约是0.7%。

但是一个非常偶然的机会,我使用了Event/Delegate的Invoke机制,结果发现这样一个看似的废操作,却让程序的CPU占用率下降到0.2%左右

 private void UdiskWather_Deleted(object sender, FileSystemEventArgs e)
        {
            if(this.InvokeRequired)
            {
                this.Invoke(new DeleteDelegate(DeleteFileHandler), new object[] { sender,e });               }
            else
            {
                DeleteFileHandler(sender, e);
            }
        }

在我最初的认识中.net中的Delegate机制在调用过程中是要进行拆、装箱操作的,因此这不拖慢操作就不错了,但实际的验证结果却相反。​

看似没用的Invoke到底有什么用

这里先给出结论,Invoke能提升程序执行效率,其关键还是在于线程在多核之间切换的消耗要远远高于拆、装箱的资源消耗,我们知道我们程序的核心就是操作files这个共享变量,每次在被检测的U盘目录中如果发生文件变动,其回调通知函数可能都运行在不同的线程,如下:

Invoke机制的背后其实就是保证所有对于files这个共享变量的操作,全部都是由一个线程执行完成的。

目前.Net的代码都开源的,下面我们大致讲解一下Invoke的调用过程,不管是BeginInvoke还是Invoke背后其实都是调用的MarshaledInvoke方法来完成的,如下:

​
public IAsyncResult BeginInvoke(Delegate method, params Object[] args) {
            using (new MultithreadSafeCallScope()) {
                Control marshaler = FindMarshalingControl();
                return(IAsyncResult)marshaler.MarshaledInvoke(this, method, args, false);
            }
        }
​

MarshaledInvoke的主要工作是创建ThreadMethodEntry对象,并把它放在一个链表里进行管理,然后调用PostMessage将相关信息发给要通信的线程,如下:

​
private Object MarshaledInvoke(Control caller, Delegate method, Object[] args, bool synchronous) {
            if (!IsHandleCreated) {
                throw new InvalidOperationException(SR.GetString(SR.ErrorNoMarshalingThread));
            }
            ActiveXImpl activeXImpl = (ActiveXImpl)Properties.GetObject(PropActiveXImpl);
            if (activeXImpl != null) {
                IntSecurity.UnmanagedCode.Demand();
            }
            // We don't want to wait if we're on the same thread, or else we'll deadlock.
            // It is important that syncSameThread always be false for asynchronous calls.
            //
            bool syncSameThread = false;
            int pid; // ignored
            if (SafeNativeMethods.GetWindowThreadProcessId(new HandleRef(this, Handle), out pid) == SafeNativeMethods.GetCurrentThreadId()) {
                if (synchronous)
                    syncSameThread = true;
            }
            // Store the compressed stack information from the thread that is calling the Invoke()
            // so we can assign the same security context to the thread that will actually execute
            // the delegate being passed.
            //
            ExecutionContext executionContext = null;
            if (!syncSameThread) {
                executionContext = ExecutionContext.Capture();
            }
            ThreadMethodEntry tme = new ThreadMethodEntry(caller, this, method, args, synchronous, executionContext);
            lock (this) {
                if (threadCallbackList == null) {
                    threadCallbackList = new Queue();
                }
            }
            lock (threadCallbackList) {
                if (threadCallbackMessage == 0) {
                    threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion + "_ThreadCallbackMessage");
                }
                threadCallbackList.Enqueue(tme);
            }
            if (syncSameThread) {
                InvokeMarshaledCallbacks();
            }  else {
                //
                UnsafeNativeMethods.PostMessage(new HandleRef(this, Handle), threadCallbackMessage, IntPtr.Zero, IntPtr.Zero);
            }
            if (synchronous) {
                if (!tme.IsCompleted) {
                    WaitForWaitHandle(tme.AsyncWaitHandle);
                }
                if (tme.exception != null) {
                    throw tme.exception;
                }
                return tme.retVal;
            }
            else {
                return(IAsyncResult)tme;
            }
        }
​

Invoke的机制就保证了一个共享变量只能由一个线程维护,这和GO语言使用通信来替代共享内存的设计是暗合的,他们的理念都是 "让同一块内存在同一时间内只被一个线程操作" 。这和现代计算体系结构的多核CPU(SMP)有着密不可分的联系,

这里我们先来科普一下CPU之间的通信MESI协议的内容。我们知道现代的CPU都配备了高速缓存,按照多核高速缓存同步的MESI协议约定,每个缓存行都有四个状态,分别是E(exclusive)、M(modified)、S(shared)、I(invalid),其中:

M:代表该缓存行中的内容被修改,并且该缓存行只被缓存在该CPU中。这个状态代表缓存行的数据和内存中的数据不同。

E:代表该缓存行对应内存中的内容只被该CPU缓存,其他CPU没有缓存该缓存对应内存行中的内容。这个状态的缓存行中的数据与内存的数据一致。

I:代表该缓存行中的内容无效。

S:该状态意味着数据不止存在本地CPU缓存中,还存在其它CPU的缓存中。这个状态的数据和内存中的数据也是一致的。不过只要有CPU修改该缓存行都会使该行状态变成 I 。

四种状态的状态转移图如下:

​我们上文也提到了,不同的线程是有大概率是运行在不同CPU核上的,在不同CPU操作同一块内存时,站在CPU0的角度上看,就是CPU1会不断发起remote write的操作,这会使该高速缓存的状态总是会在S和I之间进行状态迁移,而一旦状态变为I将耗费比较多的时间进行状态同步。

因此我们可以基本得出 this.Invoke(new DeleteDelegate(DeleteFileHandler), new object[] { sender,e });   ;这行看似无关紧要的代码之后,无意中使files共享变量的维护操作,由多核多线程共同操作,变成了众多子线程向主线程通信,所有维护操作均由主线程进行,这也使最终的执行效率有所提高。

​深度解读,为何要加两把锁

在当前使用通信替代共享内存的大潮之下,锁其实是最重要的设计。

我们看到在.Net的Invoke实现中,使用了两把锁lock (thislock (threadCallbackList)

lock (this) {
                if (threadCallbackList == null) {
                    threadCallbackList = new Queue();
                }
            }
            lock (threadCallbackList) {
                if (threadCallbackMessage == 0) {
                    threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion + "_ThreadCallbackMessage");
                }
                threadCallbackList.Enqueue(tme);
            }

在.NET当中lock关键字的基本可以理解为提供了一个近似于CAS的锁(Compare And Swap)。CAS的原理不断地把"期望值"和"实际值"进行比较,当它们相等时,说明持有锁的CPU已经释放了该锁,那么试图获取这把锁的CPU就会尝试将"new"的值(0)写入"p"(交换),以表明自己成为spinlock新的owner。伪代码演示如下:

void CAS(int p, int old,int new)
{
    if *p != old
        do nothing
    else
     *p ← new
}

基于CAS的锁效率没问题,尤其是在没有多核竞争的情况CAS表现得尤其优秀,但CAS最大的问题就是不公平,因为如果有多个CPU同时在申请一把锁,那么刚刚释放锁的CPU极可能在下一轮的竞争中获取优势,再次获得这把锁,这样的结果就是一个CPU忙死,而其它CPU却很闲,我们很多时候诟病多核SOC“一核有难,八核围观”其实很多时候都是由这种不公平造成的。

为了解决CAS的不公平问题,业界大神们又引入了TAS(Test And Set Lock)机制,个人感觉还是把TAS中的T理解为Ticket更好记一些,TAS方案中维护了一个请求该锁的头尾索引值,由"head"和"tail"两个索引组成。

struct lockStruct{
    int32 head;
    int32 tail;
} ;

"head"代表请求队列的头部,"tail"代表请求队列的尾部,其初始值都为0。

最一开始时,第一个申请的CPU发现该队列的tail值是0,那么这个CPU会直接获取这把锁,并会把tail值更新为1,并在释放该锁时将head值更新为1。

在一般情况下当锁被持有的CPU释放时,该队列的head值会被加1,当其他CPU在试图获取这个锁时,锁的tail值获取到,然后把这个tail值加1,并存储在自己专属的寄存器当中,然后再把更新后的tail值更新到队列的tail当中。接下来就是不断地循环比较,判断该锁当前的"head"值,是否和自己存储在寄存器中的"tail"值相等,相等时则代表成功获得该锁。

TAS这类似于用户到政务大厅去办事时,首先要在叫号机取号,当工作人员广播叫到的号码与你手中的号码一致时,你就获取了办事柜台的所有权。

但是TAS却存在一定的效率问题,根据我们上文介绍的MESI协议,这个lock的头尾索引其实是在各个CPU之间共享的,因此tail和head频繁更新,还是会引发调整缓存不停的invalidate,这会极大的影响效率。

因此我们看到在.Net的实现中干脆就直接引入了threadCallbackList的队列,并不断将tme(ThreadMethodEntry)加入队尾,而接收消息的进程,则不断从队首获取消息.

lock (threadCallbackList) {
                if (threadCallbackMessage == 0) {
                    threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion + "_ThreadCallbackMessage");
                }
                threadCallbackList.Enqueue(tme);
            }

当队首指向这个tme时,消息才被发送,其实是一种类似于MAS的实现,当然MAS实际是为每个CPU都建立了一个专属的队列,和Invoke的设计略有不同,不过基本的思想是一致的。

总结

本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注我们的更多内容!

(0)

相关推荐

  • C#实现控制线程池最大数并发线程

    1. 实验目的: 使用线程池的时候,有时候需要考虑服务器的最大线程数目和程序最快执行所有业务逻辑的取舍. 并非逻辑线程越多也好,而且新的逻辑线程必须会在线程池的等待队列中等待 ,直到线程池中工作的线程执行完毕, 才会有系统线程取出等待队列中的逻辑线程,进行CPU运算. 2.  解决问题: <a>如果不考虑服务器实际可支持的最大并行线程个数,程序不停往线程池申请新的逻辑线程,这个时候我们可以发现CPU的使用率会不断飙升,并且内存.网络带宽占用也会随着逻辑线程在CPU队列中堆积,而不断增大. &l

  • 详细聊聊C#的并发机制优秀在哪

    目录 前言 一行没用的代码却提高了效率? ​ 看似没用的Invoke到底有什么用 ​深度解读,为何要加两把锁 总结 前言 笔者上次用C#写.Net代码差不多还是10多年以前,由于当时Java已经颇具王者风范,Net几乎被打得溃不成军.因此当时笔者对于这个.Net的项目态度比较敷衍了事,没有对其中一些优秀机制有很深的了解,在去年写<C和Java没那么香了,高并发时代谁能称王>时都没给.Net以一席之地,不过最近恰好机缘巧合,我又接手了一个Windows方面的项目,这也让我有机会重新审视一下自己关

  • c#编写的高并发数据库控制访问代码

    代码的作用在于保证在上端缓存服务失效(一般来说概率比较低)时,形成倒瓶颈,从而能够保护数据库,数据库宕了,才是大问题(比如影响其他应用). 假设(非完全正确数据,仅做示例): 每秒支持10,000,000次查询(千万); 一次读库需要耗时:1ms; 修改内存变量需要耗时:0.001ms; 那么: 每秒最终访问的数据库的请求数量 < 1000 其他的9,900,000个请求会返回到其他页面.这就是为啥很多抢单网站有人可以访问,而有人得到繁忙中页面的原因. 微观到1ms来看,在currentVali

  • C#并发编程入门教程之概述

    写在前面 并发编程一直都存在,只不过过去的很长时间里,比较难以实现,随着互联网的发展,人口红利的释放,更加友好的支持并发编程已经成了主流编程语言的标配,而对于软件开发人员来说,没有玩过并发编程都会有点不好意思.本系列文章将会以C#语言为主,详细介绍并发编程. 什么是并发编程,其实很简单,并发编程就是在一台处理器上同时做多件事情,并发编程的目标就是充分利用处理器的每一个核,以达到最高的处理性能.举个例子,服务器在响应第一个请求的同时响应第二个请求. 关于并发编程的几个误解 误解一:并发编程就是多线

  • C#的并发机制优秀在哪你知道么

    目录 一行没用的代码却提高了效率? 看似没用的Invoke到底有什么用 ​深度解读,为何要加两把锁 总结 笔者上次用C#写.Net代码差不多还是10多年以前,由于当时Java已经颇具王者风范,Net几乎被打得溃不成军.因此当时笔者对于这个.Net的项目态度比较敷衍了事,没有对其中一些优秀机制有很深的了解,在去年写<C和Java没那么香了,高并发时代谁能称王>时都没给.Net以一席之地,不过最近恰好机缘巧合,我又接手了一个Windows方面的项目,这也让我有机会重新审视一下自己关于.Net框架的

  • 浅谈Go语言并发机制

    Go 语言相比Java等一个很大的优势就是可以方便地编写并发程序.Go 语言内置了 goroutine 机制,使用goroutine可以快速地开发并发程序, 更好的利用多核处理器资源.这篇文章学习goroutine 的应用及其调度实现. 一.Go语言对并发的支持 使用goroutine编程 使用 go 关键字用来创建 goroutine .将go声明放到一个需调用的函数之前,在相同地址空间调用运行这个函数,这样该函数执行时便会作为一个独立的并发线程.这种线程在Go语言中称作goroutine.

  • Golang CSP并发机制及使用模型

    目录 CSP并发模型 Golang CSP Channel Goroutine Goroutine 调度器 总结 今天介绍一下 go语言的并发机制以及它所使用的CSP并发模型 CSP并发模型 CSP模型是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型. CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel. Golang CSP Golang 就是借用CSP模型的一些概念为之实现并发进行理论

  • GoLang并发机制探究goroutine原理详细讲解

    目录 1. 进程与线程 2. goroutine原理 3. 并发与并行 3.1 在1个逻辑处理器上运行Go程序 3.2 goroutine的停止与重新调度 3.3 在多个逻辑处理器上运行Go程序 通常程序会被编写为一个顺序执行并完成一个独立任务的代码.如果没有特别的需求,最好总是这样写代码,因为这种类型的程序通常很容易写,也很容易维护.不过也有一些情况下,并行执行多个任务会有更大的好处.一个例子是,Web 服务需要在各自独立的套接字(socket)上同时接收多个数据请求.每个套接字请求都是独立的

  • Redis处理高并发机制原理及实例解析

    1.Redis是基于内存的,内存的读写速度非常快: 2.Redis是单线程的,省去了很多上下文切换线程的时间: 3.Redis使用多路复用技术,可以处理并发的连接.非阻塞IO 内部实现采用epoll,采用了epoll+自己实现的简单的事件框架.epoll中的读.写.关闭.连接都转化成了事件,然后利用epoll的多路复用特性,绝不在io上浪费一点时间. 下面重点介绍单线程设计和IO多路复用核心设计快的原因 为什么Redis是单线程的 1.官方答案 因为Redis是基于内存的操作,CPU不是Redi

  • python并发编程之多进程、多线程、异步和协程详解

    最近学习python并发,于是对多进程.多线程.异步和协程做了个总结. 一.多线程 多线程就是允许一个进程内存在多个控制权,以便让多个函数同时处于激活状态,从而让多个函数的操作同时运行.即使是单CPU的计算机,也可以通过不停地在不同线程的指令间切换,从而造成多线程同时运行的效果. 多线程相当于一个并发(concunrrency)系统.并发系统一般同时执行多个任务.如果多个任务可以共享资源,特别是同时写入某个变量的时候,就需要解决同步的问题,比如多线程火车售票系统:两个指令,一个指令检查票是否卖完

  • PHP curl 并发最佳实践代码分享

    本文将探讨两种具体的实现方法, 并对不同的方法做简单的性能对比. 1. 经典cURL并发机制及其存在的问题 经典的cURL实现机制在网上很容易找到, 比如参考PHP在线手册的如下实现方式: 复制代码 代码如下: function classic_curl($urls, $delay) { $queue = curl_multi_init(); $map = array(); foreach ($urls as $url) { // create cURL resources $ch = curl

  • php cURL和Rolling cURL并发方式比较

    在实际项目或者自己编写小工具(比如新闻聚合,商品价格监控,比价)的过程中, 通常需要从第3方网站或者API接口获取数据, 在需要处理1个URL队列时, 为了提高性能, 可以采用cURL提供的curl_multi_*族函数实现简单的并发.本文将探讨两种具体的实现方法, 并对不同的方法做简单的性能对比.1. 经典cURL并发机制及其存在的问题经典的cURL实现机制在网上很容易找到, 比如参考PHP在线手册的如下实现方式: 复制代码 代码如下: function classic_curl($urls,

  • 详解Golang 中的并发限制与超时控制

    前言 上回在 用 Go 写一个轻量级的 ssh 批量操作工具里提及过,我们做 Golang 并发的时候要对并发进行限制,对 goroutine 的执行要有超时控制.那会没有细说,这里展开讨论一下. 以下示例代码全部可以直接在 The Go Playground上运行测试: 并发 我们先来跑一个简单的并发看看 package main import ( "fmt" "time" ) func run(task_id, sleeptime int, ch chan st

随机推荐