运用示例简单讲解C#取消令牌CancellationTokenSource

目录
  • 前言
  • 简单示例
  • 基础操作
  • 定时取消
  • 关联取消
  • 判断取消
  • 源码探究
  • 构造入手
  • 小插曲WaitHandle
  • 注册操作
  • 取消操作
  • Cancel操作
  • CancelAfter操作
  • 总结

前言

相信大家在使用C#进行开发的时候,特别是使用异步的场景,多多少少会接触到CancellationTokenSource。看名字就知道它和取消异步任务相关的,而且一看便知大名鼎鼎的CancellationToken就是它生产出来的。不看不知道,一看吓一跳。它在取消异步任务、异步通知等方面效果还是不错的,不仅好用而且够强大。无论是微软底层类库还是开源项目涉及到Task相关的,基本上都能看到它的身影,而微软近几年也是很重视框架中的异步操作,特别是在.NET Core上基本上能看到Task的地方就能看到CancellationTokenSource的身影。这次我们抱着学习的态度,来揭开它的神秘面纱。

简单示例

相信对于CancellationTokenSource基本的使用,许多同学已经非常熟悉了。不过为了能够让大家带入文章的节奏,我们还是打算先展示几个基础的操作,让大家找找感觉,回到那个熟悉的年代。

基础操作

首先呈现一个最基础的操作。

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = cancellationTokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("取消了???"));
cancellationToken.Register(() => System.Console.WriteLine("取消了!!!"));
cancellationToken.Register(state => System.Console.WriteLine($"取消了。。。{state}"),"啊啊啊");
System.Console.WriteLine("做了点别的,然后取消了.");
cancellationTokenSource.Cancel();

这个操作是最简单的操作,我们上面提到过CancellationTokenSource就是用来生产CancellationToken的,还可以说CancellationToken是CancellationTokenSource的表现,这个待会看源码的时候我们会知道为啥这么说。这里呢我们给CancellationToken注册几个操作,然后使用CancellationTokenSource的Cancel方法取消操作,这时候控制台就会打印结果如下

做了点别的,然后取消了.
取消了。。。啊啊啊
取消了!!!
取消了???

通过上面简单的示例,大家应该非常轻松的理解了它的简单使用。

定时取消

有的时候呢我们可能需要超时操作,比如我不想一直等着,到了一个固定的时间我就要取消操作,这时候我们可以利用CancellationTokenSource的构造函数给定一个限定时间,过了这个时间CancellationTokenSource就会被取消了,操作如下

//设置3000毫秒(即3秒)后取消
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(3000);
CancellationToken cancellationToken = cancellationTokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("我被取消了."));
System.Console.WriteLine("先等五秒钟.");
await Task.Delay(5000);
System.Console.WriteLine("手动取消.")
cancellationTokenSource.Cancel();

然后在控制台打印的结果是这个样子的,活脱脱的为我们实现了内建的超时操作。

先等五秒钟.
我被取消了.
手动取消.

上面的写法是在构造CancellationTokenSource的时候设置超时等待,还有另一种写法等同于这种写法,使用的是CancelAfter方法,具体使用如下

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Token.Register(() => System.Console.WriteLine("我被取消了."));
//五秒之后取消
cancellationTokenSource.CancelAfter(5000);
System.Console.WriteLine("不会阻塞,我会执行.");

这个操作也是定时取消操作,需要注意的是CancelAfter方法并不会阻塞执行,所以打印的结果是

不会阻塞,我会执行.
我被取消了.

关联取消

还有的时候是这样的场景,就是我们设置一组关联的CancellationTokenSource,我们期望的是只要这一组里的任意一个CancellationTokenSource被取消了,那么这个被关联的CancellationTokenSource就会被取消。说得通俗一点就是,我们几个当中只要一个不在了,那么你也可以不在了,具体的实现方式是这样的

//声明几个CancellationTokenSource
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationTokenSource tokenSource2 = new CancellationTokenSource();
CancellationTokenSource tokenSource3 = new CancellationTokenSource();

tokenSource2.Token.Register(() => System.Console.WriteLine("tokenSource2被取消了"));

//创建一个关联的CancellationTokenSource
CancellationTokenSource tokenSourceNew = CancellationTokenSource.CreateLinkedTokenSource(tokenSource.Token, tokenSource2.Token, tokenSource3.Token);
tokenSourceNew.Token.Register(() => System.Console.WriteLine("tokenSourceNew被取消了"));
//取消tokenSource2
tokenSource2.Cancel();

上述示例中因为tokenSourceNew关联了tokenSource、tokenSource2、tokenSource3所以只要他们其中有一个被取消那么tokenSourceNew也会被取消,所以上述示例的打印结果是

tokenSourceNew被取消了
tokenSource2被取消了

判断取消

上面我们使用的方式,都是通过回调的方式得知CancellationTokenSource被取消了,没办法通过标识去得知CancellationTokenSource是否可用。不过微软贴心的为我们提供了IsCancellationRequested属性去判断,需要注意的是它是CancellationToken的属性,具体使用方式如下

CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = tokenSource.Token;
//打印被取消
cancellationToken.Register(() => System.Console.WriteLine("被取消了."));
//模拟传递的场景
Task.Run(async ()=> {
    while (!cancellationToken.IsCancellationRequested)
    {
        System.Console.WriteLine("一直在执行...");
        await Task.Delay(1000);
    }
});
//5s之后取消
tokenSource.CancelAfter(5000);

上述代码五秒之后CancellationTokenSource被取消,因此CancellationTokenSource的Token也会被取消。反映到IsCancellationRequested上就是值为true说明被取消,为false说明没被取消,因此控制台输出的结果是

一直在执行...
一直在执行...
一直在执行...
一直在执行...
一直在执行...
被取消了.

还有另一种方式,也可以主动判断任务是否被取消,不过这种方式简单粗暴,直接是抛出了异常。如果是使用异步的方式的话,需要注意的是Task内部异常的捕获方式,否则对外可能还没有感知到具体异常的原因,它的使用方式是这样的,这里为了演示方便我直接换了一种更直接的方式

CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = tokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("被取消了."));
tokenSource.CancelAfter(5000);
while (true)
{
    //如果操作被取消则直接抛出异常
    cancellationToken.ThrowIfCancellationRequested();
    System.Console.WriteLine("一直在执行...");
    await Task.Delay(1000);
}

执行五秒之后则直接抛出 System.OperationCanceledException: The operation was canceled.异常,异步情况下注意异常处理的方式即可。通过上面这些简单的示例,相信大家对CancellationTokenSource有了一定的认识,大概知道了在什么时候可以使用它,主要是异步取消通知,或者限定时间操作通知等等。CancellationTokenSource是个不错的神器,使用简单功能强大。

源码探究

通过上面的示例,相信大家对CancellationTokenSource有了一个基本的认识,真的是非常强大,而且使用起来也非常的简单,这也是c#语言的精妙之处,非常实用,让你用起来的时候非常舒服,有种用着用着就想跪下的冲动。步入正题,接下来让我们来往深处看看CancellationTokenSource的源码,看看它的工作机制是啥。本文贴出的源码是博主精简过的,毕竟源码太多不太可能全部粘贴出来,主要是跟着它的思路了解它的工作方式。

构造入手

因为这一次呢CancellationTokenSource的初始化函数中有一个比较重要的构造函数,那就是可以设置定时超时的操作,那么我们就从它的构造函数入手[点击查看源码👈]

//全局状态
private volatile int _state;
//未取消状态值
private const int NotCanceledState = 1;

/// <summary>
/// 无参构造初始化状态
/// </summary>
public CancellationTokenSource() => _state = NotCanceledState;

/// <summary>
/// 定时取消构造
/// </summary>
public CancellationTokenSource(TimeSpan delay)
{
    //获取timespan的毫秒数
    long totalMilliseconds = (long)delay.TotalMilliseconds;
    if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue)
    {
        throw new ArgumentOutOfRangeException(nameof(delay));
    }
    //调用InitializeWithTimer
    InitializeWithTimer((int)totalMilliseconds);
}

public CancellationTokenSource(int millisecondsDelay)
{
    if (millisecondsDelay < -1)
    {
        throw new ArgumentOutOfRangeException(nameof(millisecondsDelay));
    }
    //调用InitializeWithTimer
    InitializeWithTimer(millisecondsDelay);
}

无参构造函数没啥好说的,就是给全局state状态初始化NotCanceledState的初始值,也就是初始化状态。我们比较关注的是可以定时取消的构造函数,虽然是两个构造函数,但是殊途同归,本质都是传递的毫秒整形参数,而且调用的核心方法都是InitializeWithTimer,看来是一个定时器操作,这样不奇怪了,我们看下InitializeWithTimer方法的实现[点击查看源码👈]

//任务完成状态值
private const int NotifyingCompleteState = 2;
//定时器
private volatile TimerQueueTimer? _timer;
//定时器回调初始化
private static readonly TimerCallback s_timerCallback = TimerCallback;
//定时器回调委托本质是调用的CancellationTokenSource的NotifyCancellation方法
private static void TimerCallback(object? state) =>
    ((CancellationTokenSource)state!).NotifyCancellation(throwOnFirstException: false);

private void InitializeWithTimer(uint millisecondsDelay)
{
    if (millisecondsDelay == 0)
    {
        //如果定时的毫秒为0,则设置全局状态为NotifyingCompleteState
        _state = NotifyingCompleteState;
    }
    else
    {
        //如果超时毫秒不为0则初始化定时器,并设置定时器定时的回调
        _timer = new TimerQueueTimer(s_timerCallback, this, millisecondsDelay, Timeout.UnsignedInfinite, flowExecutionContext: false);
    }
}

通过这个方法,我们可以非常清晰的看到定时初始化的核心操作其实就是初始化一个定时器,而定时的时间就是我们初始化传递的毫秒数,其中s_timerCallback是定时的回调函数,即如果等待超时之后则调用这个委托,其本质正是CancellationTokenSource的NotifyCancellation方法,这个方法正是处理超时之后的操作[点击查看源码👈]

//信号控制类,通过信号判断是否需要继续执行或阻塞
private volatile ManualResetEvent? _kernelEvent;
//throwOnFirstException函数是指示如果被取消了是否抛出异常
private void NotifyCancellation(bool throwOnFirstException)
{
    //如果任务已经取消则直接直接释放定时器
    if (!IsCancellationRequested && Interlocked.CompareExchange(ref _state, NotifyingState, NotCanceledState) == NotCanceledState)
    {
        TimerQueueTimer? timer = _timer;
        if (timer != null)
        {
            _timer = null;
            timer.Close();
        }
        //信号量涉及到了一个重要的属性WaitHandle接下来会说
        _kernelEvent?.Set();
        //执行取消操作,是取消操作的核心,讲取消操作的时候咱们会着重说这个
        ExecuteCallbackHandlers(throwOnFirstException);
        Debug.Assert(IsCancellationCompleted, "Expected cancellation to have finished");
    }
}

NotifyCancellation正是处理定时器到时的操作,说白了就是到了指定的时间但是没有手动取消执行的操作,其实也是执行的取消操作,这个方法里涉及到了两个比较重要的点,也是接下来我们会分析的点,这里做一下说明

  • 首先是ManualResetEvent这个实例,这个类的功能是通过信号机制控制是否阻塞或执行后续操作,与之相辅的还有另一个类AutoResetEvent。这两个类实现的效果是一致的,只是ManualResetEvent需要手动重置初始状态,而AutoResetEvent则会自动重置。有关两个类的说明,这里不做过多介绍,有需要了解的同学们可以自行百度。而CancellationTokenSource类的一个重要属性WaitHandle正是使用的它。
  • 还有一个是ExecuteCallbackHandlers方法,这个是CancellationTokenSource执行取消操作的核心操作。为了保证阅读的顺序性,咱们在讲取消操作的时候在重点讲这个方法。

上面提到了,为了保证阅读的顺序性方便理解,咱们在本文接下来会讲解这两部分,就不再初始化这里讲解了,这里做一下标记,以防大家觉得没讲清楚就继续了。

小插曲WaitHandle

上面我们提到了CancellationTokenSource的WaitHandle属性,它是基于ManualResetEvent实现的。这个算是一个稍微独立的地方,我们可以先进行讲解一下[点击查看源码👈]

private volatile ManualResetEvent? _kernelEvent;
internal WaitHandle WaitHandle
{
    get
    {
        ThrowIfDisposed();
        //如果初始化过了则直接返回
        if (_kernelEvent != null)
        {
            return _kernelEvent;
        }

        //初始化一个ManualResetEvent,给定初始值为false
        var mre = new ManualResetEvent(false);
        //线程安全操作如果有别的线程初始了则释放上面初始化的操作
        if (Interlocked.CompareExchange(ref _kernelEvent, mre, null) != null)
        {
            mre.Dispose();
        }

        //如果任务已取消则后续操作不阻塞
        if (IsCancellationRequested)
        {
            _kernelEvent.Set();
        }
        return _kernelEvent;
    }
}

通过这段代码我们可以看到,如果使用了WaitHandle属性则可以使用它实现简单的阻塞通知操作,也就是收到取消通知操作之后我们可以执行WaitHandle之后的操作,但是WaitHandle是internal修饰的,我们该怎么使用呢?莫慌,我们知道CancellationTokenSource的Token属性获取的是CancellationToken实例[点击查看源码👈]

public CancellationToken Token
{
    get
    {
        ThrowIfDisposed();
        return new CancellationToken(this);
    }
}

直接实例化了一个CancellationToken实例返回去了,并传递了当前CancellationTokenSource实例,找到CancellationToken的这个构造函数[点击查看源码👈]

private readonly CancellationTokenSource? _source;
internal CancellationToken(CancellationTokenSource? source) => _source = source;
public WaitHandle WaitHandle => (_source ?? CancellationTokenSource.s_neverCanceledSource).WaitHandle;

通过上面的代码我们可以看到通过CancellationToken实例便可以使用WaitHandle属性,实现我们访问到它的效果,光是说的话可能有点迷糊,通过一个简单的示例我们来了解WaitHandle的使用方式,简单来看下

CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = tokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("被取消了."));
tokenSource.CancelAfter(5000);
Task.Run(()=> {
    System.Console.WriteLine("阻塞之前");
    cancellationToken.WaitHandle.WaitOne();
    System.Console.WriteLine("阻塞取消,执行到了.");
});
System.Console.WriteLine("执行到了这里");

在CancellationTokenSource为被取消之前WaitHandle.WaitOne()方法会阻塞后续执行,也就是下面的输出暂时不会输出。等到CancellationTokenSource执行了Cancel操作里调用了ManualResetEvent的Set方法停止阻塞,后续的输出才会被执行到这是一个同步操作,如果了解ManualResetEvent的同学相信对这个不难理解。为了演示效果我用Task演示异步的情况,所以执行的结果如下所示

执行到了这里
阻塞之前
阻塞取消,执行到了.
被取消了.

注册操作

上面我们大概讲解了一些初始化相关的和一些辅助的操作,接下来我们看一下核心的注册操作,注册操作的用途就是注册CancellationTokenSource取消或超时后需要执行的动作,而注册Register的操作并未由CancellationTokenSource直接进行,而是通过它的Token属性即CancellationToken实例操作的,话不多说直接找到CancellationToken的Register方法[点击查看源码👈]

public CancellationTokenRegistration Register(Action callback) =>
Register(
    s_actionToActionObjShunt,
    callback ?? throw new ArgumentNullException(nameof(callback)),
    useSynchronizationContext: false,
    useExecutionContext: true);

它是直接调用自己的重载方法,注意几个参数,如果看细节的话还是要关注方法参数的。过程就省略了,直接找到最底层的方法[点击查看源码👈]

private CancellationTokenRegistration Register(Action<object?> callback, object? state, bool useSynchronizationContext, bool useExecutionContext)
{
    if (callback == null)
        throw new ArgumentNullException(nameof(callback));

    //_source就是传递下来的CancellationTokenSource
    CancellationTokenSource? source = _source;
    //本质是调用的CancellationTokenSource的InternalRegister方法
    return source != null ?
        source.InternalRegister(callback, state, useSynchronizationContext ? SynchronizationContext.Current : null, useExecutionContext ? ExecutionContext.Capture() : null) :
        default;

从这个最底层的方法我们可以得知,其本质还是调用CancellationTokenSource的InternalRegister方法,核心操作都不在CancellationToken还是在CancellationTokenSource类,CancellationToken更像是依赖CancellationTokenSource的表现类,看一下InternalRegister方法[点击查看源码👈]

//初始化CallbackPartition数组
private volatile CallbackPartition?[]? _callbackPartitions;
//获取初始化上面数组的长度,根据当前CPU核心数获取的
private static readonly int s_numPartitions = GetPartitionCount();

internal CancellationTokenRegistration InternalRegister(
    Action<object?> callback, object? stateForCallback, SynchronizationContext? syncContext, ExecutionContext? executionContext)
{
    //判断有没有被取消
    if (!IsCancellationRequested)
    {
        //如果已被释放直接返回
        if (_disposed)
        {
            return default;
        }
        CallbackPartition?[]? partitions = _callbackPartitions;
        if (partitions == null)
        {
            //首次调用初始化CallbackPartition数组
            partitions = new CallbackPartition[s_numPartitions];
            //判断_callbackPartitions如果为null,则把partitions赋值给_callbackPartitions
            partitions = Interlocked.CompareExchange(ref _callbackPartitions, partitions, null) ?? partitions;
        }
        //获取当前线程使用的分区下标
        int partitionIndex = Environment.CurrentManagedThreadId & s_numPartitionsMask;
        //获取一个CallbackPartition
        CallbackPartition? partition = partitions[partitionIndex];
        if (partition == null)
        {
            //初始化CallbackPartition实例
            partition = new CallbackPartition(this);
            //如果partitions的partitionIndex下标位置为null则使用partition填充
            partition = Interlocked.CompareExchange(ref partitions[partitionIndex], partition, null) ?? partition;
        }

        long id;
        CallbackNode? node;
        bool lockTaken = false;
        //锁住操作
        partition.Lock.Enter(ref lockTaken);
        try
        {
            id = partition.NextAvailableId++;
            //获取CallbackNode,这事真正存储回调的地方,不要被List名字迷惑,其实是要构建链表
            node = partition.FreeNodeList;
            if (node != null)
            {
                //这个比较有意思如果CallbackNode不是首次,则把最新的赋值给FreeNodeList
                partition.FreeNodeList = node.Next;
            }
            else
            {
                //首次的时候初始化一个CallbackNode实例
                node = new CallbackNode(partition);
            }
            node.Id = id;
            //Register的回调操作赋值给了CallbackNode的Callback
            node.Callback = callback;
            node.CallbackState = stateForCallback;
            node.ExecutionContext = executionContext;
            node.SynchronizationContext = syncContext;

            //构建一个CallbackNode链表,从下面的代码可以看出来构建的其实是倒序链表,最新的CallbackNode是表头
            node.Next = partition.Callbacks;
            if (node.Next != null)
            {
                node.Next.Prev = node;
            }
            //Callbacks记录的是当前的节点,如果下一次进来新节点则作为新节点的Next节点
            partition.Callbacks = node;
        }
        finally
        {
            //释放锁
            partition.Lock.Exit(useMemoryBarrier: false);
        }
        //用当前注册回调生成的CallbackNode节点生成CancellationTokenRegistration实例
        var ctr = new CancellationTokenRegistration(id, node);
        //如果未被取消则直接返回
        if (!IsCancellationRequested || !partition.Unregister(id, node))
        {
            return ctr;
        }
    }
    //走到这里说明IsCancellationRequested已经等于true了也就是被取消了,则直接执行该回调
    callback(stateForCallback);
    return default;
}

这里涉及到一个比较核心的类那就是CallbackPartition,这是一个内部类,它的主要用途就是辅助构建执行回调的链表操作,其大概实现是这个样子的[点击查看源码👈]

internal sealed class CallbackPartition
{
    public readonly CancellationTokenSource Source;
    //使用了自旋锁
    public SpinLock Lock = new SpinLock(enableThreadOwnerTracking: false);
    public CallbackNode? Callbacks;
    public CallbackNode? FreeNodeList;
    public long NextAvailableId = 1; 

    public CallbackPartition(CancellationTokenSource source)
    {
        Source = source;
    }

    internal bool Unregister(long id, CallbackNode node)
    {
        //这里面有内容,就不罗列了,判断CallbackNode是否被取消注册,如果为false说明未被取消注册
    }
}

这里面我暂时没有列出Unregister的内容,因为它是和取消相关的,说到取消的时候咱们再看,如果返回true则说明取消成功。这个类核心就是辅助构建Register回调链表的,它的核心都是在操作CallbackNode节点和其构建的回调链表,而CallbackNode则是链表的一个节点定义,其大致结构如下[点击查看源码👈]

internal sealed class CallbackNode
{
    public readonly CallbackPartition Partition;
    //构建链表的核心Prev和Next
    public CallbackNode? Prev;
    public CallbackNode? Next;

    public long Id;
    //回调操作被这个委托记录
    public Action<object?>? Callback;
    public object? CallbackState;
    public ExecutionContext? ExecutionContext;
    public SynchronizationContext? SynchronizationContext;

    public CallbackNode(CallbackPartition partition)
    {
        Partition = partition;
    }

    public void ExecuteCallback()
    {
        //这里也有代码,暂时不列出来,讲取消的时候单独讲解
    }
}

到了这里关于Register涉及到的核心操作都罗列出来了,由于贴出来的是源码相关看着是比较蒙圈的,但是如果顺着看的话其实还是大致的实现思路还是可以理解的,这里我大致的总结一下它的实现思路

  • 首先是构建了CallbackPartition数组,构建这个数组的长度是根据CPU的核心数来决定,每个CallbackPartition是操作的核心,为了防止过多的线程同时操作一个CallbackPartition实例,它采用了为不同线程分区的思路,CallbackPartition维护了构建链表节点的类CallbackNode。
  • CallbackNode是组成链表的核心,CallbackNode每个实例都是链表的一个节点,从它自包含Prev和Next属性便可以看出是一个双向链表。
  • CallbackPartition的核心功能就是为了构建Register进来的回调,从上面的InternalRegister方法里的操作我们可以得知,通过CallbackPartition的辅助将CallbackNode节点构建为一个倒序链表,也就是最新的CallbackNode实例是链表的首节点,而最老的CallbackNode实例则是链表的尾节点。每一次Register进来的回调,都被包装成了CallbackNode添加到这个链表中。

上面InternalRegister方法里我们看到操作CallbackNode的时候,使用了SpinLock自旋锁。短时间锁定的情况下SpinLock更快,因为自旋锁本质上不会让线程休眠,而是一直循环尝试对资源访问,直到可用。所以自旋锁线程被阻塞时,不进行线程上下文切换,而是空转等待。对于多核CPU而言,减少了切换线程上下文的开销,从而提高了性能。

取消操作

上面我们看到了注册相关的操作,注册还是比较统一的,就一种操作方式。取消却有两种方式,一种是超时取消,另一种是主动取消,接下来我们就分别看一下这两种方式分别是如何操作的。

Cancel操作

首先我们来看主动取消的操作方式这个是最简单最直接的方式,而且这个方法属于CancellationTokenSource类,话不多说直接看实现[点击查看源码👈]

public void Cancel() => Cancel(false);

public void Cancel(bool throwOnFirstException)
{
    ThrowIfDisposed();
    NotifyCancellation(throwOnFirstException);
}

重点来了Cancel方法居然也是调用的NotifyCancellation方法,这个方法咱们上面已经看过了。在说定时的方式构造CancellationTokenSource的时候有一个自动取消的操作,提到了NotifyCancellation方法的核心是ExecuteCallbackHandlers方法,这个是CancellationTokenSource执行取消操作的核心操作。还说了为了保证阅读的顺序性,咱们在讲取消操作的时候在重点讲这个方法。看来这个时刻终于还是到来了,直接打开ExecuteCallbackHandlers方法[点击查看源码👈]

private volatile int _threadIDExecutingCallbacks = -1;
private volatile CallbackPartition?[]? _callbackPartitions;
private const int NotifyingCompleteState = 3;
private void ExecuteCallbackHandlers(bool throwOnFirstException)
{
    //获取当前线程ID
    ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId;
    //将_callbackPartitions置为null,但是partitions不为null,因为Exchange返回的是改变之前的值
    CallbackPartition?[]? partitions = Interlocked.Exchange(ref _callbackPartitions, null);
    //如果partitions为null说明是回调已经通知完成状态了直接返回
    if (partitions == null)
    {
        Interlocked.Exchange(ref _state, NotifyingCompleteState);
        return;
    }

    List<Exception>? exceptionList = null;
    try
    {
        //遍历CallbackPartition数组
        foreach (CallbackPartition? partition in partitions)
        {
            //CallbackPartition实例为null说明这个分区未被使用直接跳过
            if (partition == null)
            {
                continue;
            }

            //循环处理CallbackNode链表
            while (true)
            {
                CallbackNode? node;
                bool lockTaken = false;
                //锁住当前操作
                partition.Lock.Enter(ref lockTaken);
                try
                {
                    //获取链表的节点
                    node = partition.Callbacks;
                    //为null说明没Register过直接中断
                    if (node == null)
                    {
                        break;
                    }
                    else
                    {
                        //如果链表遍历不是尾节点,切断和下一个节点的关联
                        if (node.Next != null) node.Next.Prev = null;
                        //把下一个节点赋值给Callbacks
                        partition.Callbacks = node.Next;
                    }
                    //当前执行节点ID
                    _executingCallbackId = node.Id;
                    node.Id = 0;
                }
                finally
                {
                    //退出锁
                    partition.Lock.Exit(useMemoryBarrier: false);
                }

                try
                {
                    //如果当时传递了同步上下文则直接在当时的上下文调用ExecuteCallback委托
                    if (node.SynchronizationContext != null)
                    {
                        node.SynchronizationContext.Send(static s =>
                        {
                            var n = (CallbackNode)s!;
                            n.Partition.Source.ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId;
                            n.ExecuteCallback();
                        }, node);
                        ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId;
                    }
                    else
                    {
                        //如果没有传递SynchronizationContext则直接调用ExecuteCallback委托
                        //即调用Register的注册的委托
                        node.ExecuteCallback();
                    }
                }
                catch (Exception ex) when (!throwOnFirstException)
                {
                    (exceptionList ??= new List<Exception>()).Add(ex);
                }
            }
        }
    }
    finally
    {
        //将全局状态置为通知完成状态
        //即已经调用过Register回调
        _state = NotifyingCompleteState;
        Volatile.Write(ref _executingCallbackId, 0);
        Interlocked.MemoryBarrier();
    }

    //如果中途存在异常则抛出
    if (exceptionList != null)
    {
        Debug.Assert(exceptionList.Count > 0, $"Expected {exceptionList.Count} > 0");
        throw new AggregateException(exceptionList);
    }
}

关于ExecuteCallback方法是CallbackNode类的方法,也就是咱们上面罗列CallbackNode类结构时被省略的方法,它的主要功能就是调用Register的回调,也就是执行Register里的委托。欠下的我会补上来,注意这里是CallbackNode类,接下来看下实现[点击查看源码👈]

public ExecutionContext? ExecutionContext;
public void ExecuteCallback()
{
    ExecutionContext? context = ExecutionContext;
    //如果Register的时候允许传递ExecutionContext则直接用这个上下文执行回调Callback
    //Callback委托也就是承载Register的委托操作
    if (context != null)
    {
        ExecutionContext.RunInternal(context, static s =>
        {
            Debug.Assert(s is CallbackNode, $"Expected {typeof(CallbackNode)}, got {s}");
            CallbackNode n = (CallbackNode)s;

            Debug.Assert(n.Callback != null);
            n.Callback(n.CallbackState);
        }, this);
    }
    else
    {
        Debug.Assert(Callback != null);
        //直接在当前线程调用Callback
        //Callback委托也就是承载Register的委托操作
        Callback(CallbackState);
    }
}

关于取消的核心方法ExecuteCallbackHandlers的重要操作,咱们已经罗列出来了,其实我们看到注册的思路的时候,就已经能猜到执行取消回调的大致思路了,既然Register的时候进行了拉链,那么取消执行注册回调肯定是变量链表执行里面的Callback了,大致总结一下

  • 执行Cancel之后核心操作还是针对构建的CallbackNode链表进行遍历,咱们之前说过构建的CallbackNode链表是倒序链表,最新的节点放在链表的首部,这也就解释了为啥我们上面的示例Register多个委托的时候,最先输出的是最后注册委托。
  • Register注册时候有参数判断是否需要传递当前同步上下文SynchronizationContext和执行上下文ExecutionContext,作用就是为了是否在当时的上下文环境执行Callback回调操作。
  • 上面的遍历代码我们看到了会执行CallbackNode.Next.Prev=null的操作,是为了断开当前链表节点和上下节点的关系,个人感觉是为了切断对象引用方便释放的,防止内存泄漏,同时也说明了默认情况下Register的的回调函数执行是一次性的,当执行完Cancel操作之后当前CancellationToken实例也就失效了。

CancelAfter操作

之前我们演示的时候说过有两种方式可以执行超时取消操作,一种是在构建CancellationTokenSource实例构造的时候传递超时时间,还有另一种是使用CancelAfter操作,这个方法表示在指定时间之后取消,效果上等同于实例化CancellationTokenSource的时候传递超时时间的操作,废话不多说直接罗列代码[点击查看源码👈]

public void CancelAfter(TimeSpan delay)
{
    long totalMilliseconds = (long)delay.TotalMilliseconds;
    if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue)
    {
        throw new ArgumentOutOfRangeException(nameof(delay));
    }
    //调用的是重载的CancelAfter方法
    CancelAfter((int)totalMilliseconds);
}

private static readonly TimerCallback s_timerCallback = obj =>
{
    ((CancellationTokenSource)obj).NotifyCancellation(throwOnFirstException: false);
};

public void CancelAfter(int millisecondsDelay)
{
    //传递的毫秒数不能小于-1
    if (millisecondsDelay < -1)
    {
        throw new ArgumentOutOfRangeException(nameof(millisecondsDelay));
    }

    //如果已经取消则直接返回
    if (IsCancellationRequested)
    {
        return;
    }

    //注册一个定时器执行s_timerCallback
    //s_timerCallback在上面我们介绍过了 本这就是调用CancellationTokenSource的NotifyCancellation方法
    TimerQueueTimer? timer = _timer;
    if (timer == null)
    {
        timer = new TimerQueueTimer(s_timerCallback, this, Timeout.UnsignedInfinite, Timeout.UnsignedInfinite, flowExecutionContext: false);
        TimerQueueTimer? currentTimer = Interlocked.CompareExchange(ref _timer, timer, null);
        if (currentTimer != null)
        {
            timer.Close();
            timer = currentTimer;
        }
    }

    try
    {
        timer.Change((uint)millisecondsDelay, Timeout.UnsignedInfinite);
    }
    catch (ObjectDisposedException)
    {
    }
}

通过上面的源码我们可以看到CancelAfter的操作代码和传递超时时间构造CancellationTokenSource的代码基本上是一致的,都是通过TimerQueueTimer的方式定时触发调用CancellationTokenSource的NotifyCancellation方法,而NotifyCancellation方法的核心实现就是ExecuteCallbackHandlers方法,这些方法咱们上面都有讲解过,就不重复介绍了,这样关于取消相关的操作我们也就全部讲解完成了。

总结

本文我们主要讲解了C#取消令牌CancellationTokenSource,虽然设计到的类并不多,但是这部分源码并不少,而且也只是讲解核心功能的部分源码,有兴趣的同学可以自行阅读这个类相关代码,如果你觉得你的GitHub比较不给力推荐一个可以阅读CoreCLR源码的网站source.dot.net这个网站看到的是目前CoreCLR最新的源码,可以直接连接到GitHub非常方便,但是最新版本的源码和稳定版本的有些差别,这个还需要注意。由于文章比较长,再加上笔者技术能力和文笔能力都有限,这里做一下简单的总结

  • CancellationTokenSource的用途就是可以感知到取消操作,其中涉及到的Register回调、WaitHandle、IsCancellationRequested都能实现这个功能,当然它还支持超时取消操作。
  • CancellationTokenSource的Register和Cancel相关成双成对的,虽然有CancelAfter和构造传递超时时间的方式,其本质和Cancel操作是一样的。
  • CancellationTokenSource的核心操作原理,是通过CallbackPartition和CallbackNode构建倒序链表,Register的时候通过Callback委托构建链表,Cancel的时候遍历构建的链表执行Callback,虽然有一堆额外操作,但是核心工作方式就是链表操作。
  • 需要注意的是,默认情况下CancellationTokenSource产生的CancellationToken是一次性的,取消了之后是没有办法进行重置的,当然微软已经为我们提供了IChangeToken去解决了CancellationToken重复触发的问题,请放心使用。

由于本篇文章篇幅较长,加上笔者能力有限,文笔更是一般,如果讲解的不清楚还望谅解,或者感兴趣的同学可以自行阅读源码。关于看源码每个人都有自己的关注点,我一般的初衷都是弄明白它的原理,顺便学习下它代码风格或思路。学无止境,结果有时候并不那么重要,过程才重要。就和许多人追求自己能有到达什么样的高度,成功其实只是成长过程中顺便的一种表现,就和你如果不满现状,说明你在很早之前没想过改变自己一样。

到此这篇关于运用示例简单讲解C#取消令牌CancellationTokenSource的文章就介绍到这了,更多相关C# CancellationTokenSource内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • C# MemoryStream类案例详解

    MemoryStream位于System.IO命名空间,为系统内存提供流式的读写操作.常作为其他流数据交换时的中间对象操作. MemoryStream类封装一个字节数组,在构造实例时可以使用一个字节数组作为参数,但是数组的长度无法调整.使用默认无参数构造函数创建实例,可以使用Write方法写入,随着字节数据的写入,数组的大小自动调整. 在对MemoryStream类中数据流进行读取时,可以使用seek方法定位读取器的当前的位置,可以通过指定长度的数组一次性读取指定长度的数据.ReadByte方法

  • C#泛型的使用及示例详解

    目录 一.什么是泛型 二.为什么使用泛型 三.泛型类型参数 四.泛型类 五.泛型约束 六.泛型的协变和逆变 七.泛型缓存 这篇文章主要讲解C#中的泛型,泛型在C#中有很重要的地位,尤其是在搭建项目框架的时候. 一.什么是泛型 泛型是C#2.0推出的新语法,不是语法糖,而是2.0由框架升级提供的功能. 我们在编程程序时,经常会遇到功能非常相似的模块,只是它们处理的数据不一样.但我们没有办法,只能分别写多个方法来处理不同的数据类型.这个时候,那么问题来了,有没有一种办法,用同一个方法来处理传入不同种

  • C# 字符串string和内存流MemoryStream及比特数组byte[]之间相互转换

    定义string变量为str,内存流变量为ms,比特数组为bt 1.字符串转比特数组 复制代码 代码如下: (1)byte[] bt=System.Text.Encoding.Default.GetBytes("字符串"); (2)byte[] bt=Convert.FromBase64String("字符串"); 2.字符串转流 复制代码 代码如下: (1)MemoryStream ms=new MemoryStream(System.Text.Encoding.

  • c#多线程之间的排他锁的实现

    我们很多时候会碰到这样的问题,使用多线程刷一个表的数据时需要多个线程不能重复提取数据,那么这个时候就需要使用到线程的排他锁了. 在c#里面其实很简单,下面先来看一个简单的小例子 Thread pingTask = new Thread(new ThreadStart(delegate { //从数据库获取1000条数 var list = getdata(); })); //启动线程 pingTask.Start(); 如果这个时候我们开启多个线程 代码如下 for (int i = 0; i

  • XmlReader 读取器读取内存流 MemoryStream 的注意事项

    MemoryStream对象提供了无需进行IO就可以创建Stream的方法,XmlTextWriter和XmlReader提供快速书写和读取XML内容的方法,结合MemoryStream,就可以直接在内存中构造XmlTextWriter,并用XmlReader进行读取. 使用MemoryStream和XmlTextWriter进行书写XML,需要注意两点:XmlTextWriter.Flush操作和重设MemoryStream.Position = 0. C#  <%@ Page Languag

  • C#泛型详解及关键字作用

    这篇文章主要来讲讲c#中的泛型,因为泛型在c#中有很重要的位置,对于写出高可读性,高性能的代码有着关键的作用. 一.什么是泛型? 泛型是 2.0 版 C# 语言和公共语言运行库 (CLR) 中的一个非常重要的新功能. 我们在编程程序时,经常会遇到功能非常相似的模块,只是它们处理的数据不一样.但我们没有办法,只能分别写多个方法来处理不同的数据类型.这个时候,那么问题来了,有没有一种办法,用同一个方法来处理传入不同种类型参数的办法呢?泛型的出现就是专门来解决这个问题的,可以看出,微软还是很贴心的.

  • C#的TimeSpan案例详解

    TimeSpan结构:表示一个时间间隔. 它含有以下四个构造函数: TimeSpan(Int64)将 TimeSpan结构的新实例初始化为指定的刻度数. (DateTime.Tick:是计算机的一个计时周期,单位是一百纳秒,即一千万分之一秒) TimeSpan(Int32, Int32, Int32)将 TimeSpan结构的新实例初始化为指定的小时数.分钟数和秒数. TimeSpan(Int32, Int32, Int32, Int32)将 TimeSpan结构的新实例初始化为指定的天数.小时

  • 运用示例简单讲解C#取消令牌CancellationTokenSource

    目录 前言 简单示例 基础操作 定时取消 关联取消 判断取消 源码探究 构造入手 小插曲WaitHandle 注册操作 取消操作 Cancel操作 CancelAfter操作 总结 前言 相信大家在使用C#进行开发的时候,特别是使用异步的场景,多多少少会接触到CancellationTokenSource.看名字就知道它和取消异步任务相关的,而且一看便知大名鼎鼎的CancellationToken就是它生产出来的.不看不知道,一看吓一跳.它在取消异步任务.异步通知等方面效果还是不错的,不仅好用而

  • Vue2.x响应式简单讲解及示例

    一.回顾Vue响应式用法 ​ vue响应式,我们都很熟悉了.当我们修改vue中data对象中的属性时,页面中引用该属性的地方就会发生相应的改变.避免了我们再去操作dom,进行数据绑定. 二.Vue响应式实现分析 对于vue的响应式原理,官网上给了出文字描述 https://cn.vuejs.org/v2/guide/reactivity.html . vue内部主要是通过数据劫持和观察者模式实现的 数据劫持: vue2.x内部使用Object.defineProperty https://dev

  • Spring @Conditional注解示例详细讲解

    目录 前言: 示例: 标注在方法上: 标注在类上: 多个条件类: 前言: @Conditional是Spring4新提供的注解,它的作用是按照一定的条件进行判断,满足条件给容器注册bean. @Conditional的定义: //此注解可以标注在类和方法上 @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Conditi

  • Scala文件操作示例代码讲解

    目录 1. 读取数据 1.1 按行读取 1.2 按字符读取 Scala使用source.buffered方法按字符读取文件 一个示例 1.3 读取词法单元和数字 1.4 从URL或者其他源读取数据 1.5 读取二进制文件 2. 写入文件 2.1 使用java.io.PrintWriter类 2.2 使用java.io.FileWriter类 2.3 使用java.io.FileOutputStream类 2.4 几种写入的区别 2.5 使用第三方库 3. Scala序列化和反序列化 3.1 什么

  • 简单讲解java中throws与throw的区别

    Java中throws和throw的区别讲解 当然,你需要明白异常在Java中式以一个对象来看待. 并且所有系统定义的编译和运行异常都可以由系统自动抛出,称为标准异常,但是一般情况下Java 强烈地要求应用程序进行完整的异常处理,给用户友好的提示,或者修正后使程序继续执行. 直接进入正题哈: 1.用户程序自定义的异常和应用程序特定的异常,必须借助于 throws 和 throw 语句来定义抛出异常. 1.1   throw是语句抛出一个异常. 语法:throw (异常对象);         

  • Oracle数据库的实例/表空间/用户/表之间关系简单讲解

    完整的Oracle数据库通常由两部分组成:Oracle数据库和数据库实例. 1) 数据库是一系列物理文件的集合(数据文件,控制文件,联机日志,参数文件等): 2) Oracle数据库实例则是一组Oracle后台进程/线程以及在服务器分配的共享内存区. 在启动Oracle数据库服务器时,实际上是在服务器的内存中创建一个Oracle实例(即在服务器内存中分配共享内存并创建相关的后台内存),然后由这个Oracle数据库实例来访问和控制磁盘中的数据文件.Oracle有一个很大的内存快,成为全局区(SGA

  • 简单讲解c++ vector

    在c++中,vector是一个十分有用的容器. 作用:它能够像容器一样存放各种类型的对象,简单地说,vector是一个能够存放任意类型的动态数组,能够增加和压缩数据. vector在C++标准模板库中的部分内容,它是一个多功能的,能够操作多种数据结构和算法的模板类和函数库. 特别注意: 使用vector需要注意以下几点: 1.加入头文件 <vector> 2.如果你要表示的向量长度较长(需要为向量内部保存很多数),容易导致内存泄漏,而且效率会很低: 3.Vector作为函数的参数或者返回值时,

  • Java反射机制的简单讲解

    🌱1. 什么是反射机制? 首先大家应该先了解两个概念,编译期和运行期,编译期就是编译器帮你把源代码翻译成机器能识别的代码,比如编译器把java代码编译成jvm识别的字节码文件,而运行期指的是将可执行文件交给操作系统去执行,JAVA反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法:对于任意一个对象,都能够调用它的任意方法和属性:这种动态获取信息以及动态调用对象方法的功能称为java语言的反射机制 简单说,反射机制值得是程序在运行时能够获取自身的信息.在java中,只要给定类的名字,那么就可以通过反射机制来获得类的所有信息. 🌱2. java反射机制提供了什么功能? 在运行时能够判断任意一个对象所属的类 在运行时构造任意一个类的对象 在运行时判断任意一个类所具有的成员变量和方法 在运行时调用任一对象的方法 在运行时创建新类对象 🌱3.new和反射创建有什么区别呢? ne

  • 简单讲解哈希表

    目录 一.哈希表的概念 1.查找算法 2.哈希表 3.哈希数组 4.关键字 5.哈希函数 6.哈希冲突 7.哈希地址 二.常用哈希函数 1.直接定址法 2.平方取中法 3.折叠法 4.除留余数法 5.位与法 三.常见哈希冲突解决方案 1.开放定址法 1)原理讲解 2)动画演示 2.再散列函数法 1)原理讲解 2)动画演示 3.链地址法 1)原理讲解 2)动画演示 4.公共溢出区法 1)原理讲解 2)动画演示 四.哈希表的实现 1.数据结构定义 2.哈希表初始化 3.哈希函数计算 4.哈希表查找

  • C语言示例代码讲解栈与队列

    目录 栈 栈的定义 顺序栈 顺序栈的定义 顺序栈的初始化 顺序栈的入栈 顺序栈的出栈 取顺序栈的栈顶元素 链栈 队列 队列的定义 队列的顺序表达与实现 队列顺序存储结构 假溢出 循环队列 循环队列的初始化 循环队列的入队 循环队列的出队 链队列 链栈的初始化 链栈的入队 链栈的出队 上文详细的讲解了顺序表与链表的实现,相信大家在顺序表与链表的指基础上,很容易就能学会站和队列,废话不多说,我们马上开始! 栈 栈的定义 栈是一种线性表,但限定这种线性表只能在某一端进行插入和删除操作 假设栈 [s =

随机推荐