c# 线程安全队列的用法原理及使用示例

什么是线程安全?

答:线程安全是多线程编程时的计算机程序代码中的一个概念。在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况。

在多线程并行的情况下会出现共享数据会线程间读取与写入不一直的情况,为了解决这种情况,通常会使用锁来解决,也就是将并行改为串行。但是在使用穿行违背了使用多线程并发的初衷,这种情况下我们可以考虑采用线程安全结构。

先看下线程安全队列的用法:

ConcurrentQueue<int> ts = new System.Collections.Concurrent.ConcurrentQueue<int>();
ts.Enqueue(1);
ts.Enqueue(2);
ts.Enqueue(3);
ts.Enqueue(4);
foreach (var r in ts)
{
  Console.Write($"data:{r} ");
}
Console.WriteLine();
ts.TryPeek(out int pk);
Console.WriteLine($"peek:{pk}");
ts.TryDequeue(out int ck);
ts.Enqueue(5);
ts.Enqueue(6);
Console.WriteLine();
foreach (var r in ts)
{
  Console.Write($"data:{r} ");
}
Console.WriteLine();
Console.ReadLine();

现在我们看下线程安全队列的实现方式:(参考自:.net framework 4.8),核心代码全部做了注释。

总的来说,(总结语放到前面,防止代码篇幅太大,同志们没有耐心翻到最底下~)

1、线程安全队列通过SpinWait自旋类来实现等待并行线程完成与Interlocked原子操作类计数实现的。

2、线程安全队列通过单向链表实现的,链的节点为长度32的数组,通过记录链的头节点与尾节点、以及队列的头尾实现队列的存储与入队、出队操作的。

public class MyConcurrentQueue<T> : IProducerConsumerCollection<T>
{
  [NonSerialized]
  private volatile Segment m_head;

  [NonSerialized]
  private volatile Segment m_tail;

  private T[] m_serializationArray;

  private const int SEGMENT_SIZE = 32;

  [NonSerialized]
  internal volatile int m_numSnapshotTakers = 0;
  /// <summary>
  /// 链尾部节点
  /// </summary>
  public MyConcurrentQueue()
  {
    m_head = m_tail = new Segment(0, this);
  }
  //尝试添加
  bool IProducerConsumerCollection<T>.TryAdd(T item)
  {
    Enqueue(item);
    return true;
  }
  /// <summary>
  /// 尝试从中移除并返回对象
  /// </summary>
  /// <param name="item">
  /// </remarks>
  bool IProducerConsumerCollection<T>.TryTake(out T item)
  {
    return TryDequeue(out item);
  }
  /// <summary>
  /// 判断当前链是否为空
  /// </summary>
  public bool IsEmpty
  {
    get
    {
      Segment head = m_head;
      if (!head.IsEmpty)
        //如果头不为空,则链非空
        return false;
      else if (head.Next == null)
        //如果头节点的下一个节点为空,且为链尾,
        return true;
      else
      //如果头节点为空且不是最后一个节点 ,则标识另一个线程正在写入该数组
      //等待中..
      {
        SpinWait spin = new SpinWait();
        while (head.IsEmpty)
        {
          //此时为空
          if (head.Next == null)
            return true;
          //否则标识正在有线程占用写入
          //线程循环一次
          spin.SpinOnce();
          head = m_head;
        }
        return false;
      }
    }
  }
  /// <summary>
  /// 用来判断链是否在变化
  /// </summary>
  /// <param name="head"></param>
  /// <param name="tail"></param>
  /// <param name="headLow"></param>
  /// <param name="tailHigh"></param>
  private void GetHeadTailPositions(out Segment head, out Segment tail,
    out int headLow, out int tailHigh)
  {
    head = m_head;
    tail = m_tail;
    headLow = head.Low;
    tailHigh = tail.High;
    SpinWait spin = new SpinWait();
    Console.WriteLine($"head.Low:{head.Low},tail.High:{tail.High},head.m_index:{head.m_index},tail.m_index:{tail.m_index}");
    //通过循环来保证值不再更改(也就是说并行线程操作结束)
    //保证线程串行核心的判断逻辑
    while (
      //头尾发生变化
      head != m_head || tail != m_tail
      //如果队列头、尾索引发生变化
      || headLow != head.Low || tailHigh != tail.High
      || head.m_index > tail.m_index)
    {
      spin.SpinOnce();
      head = m_head;
      tail = m_tail;
      headLow = head.Low;
      tailHigh = tail.High;
    }
  }
  /// <summary>
  /// 获取总数
  /// </summary>
  public int Count
  {
    get
    {
      Segment head, tail;
      int headLow, tailHigh;
      GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);
      if (head == tail)
      {
        return tailHigh - headLow + 1;
      }
      //头节点长度
      int count = SEGMENT_SIZE - headLow;
      //加上中间其他节点长度
      count += SEGMENT_SIZE * ((int)(tail.m_index - head.m_index - 1));
      //加上尾节点长度
      count += tailHigh + 1;
      return count;
    }
  }

  public object SyncRoot => throw new NotImplementedException();

  public bool IsSynchronized => throw new NotImplementedException();

  public void CopyTo(T[] array, int index)
  {

  }
  /// <summary>
  /// 暂未实现
  /// </summary>
  /// <returns></returns>
  public IEnumerator<T> GetEnumerator()
  {
    return null;
  }
  /// <summary>
  /// 添加
  /// </summary>
  /// <param name="item"></param>
  public void Enqueue(T item)
  {
    SpinWait spin = new SpinWait();
    while (true)
    {
      Segment tail = m_tail;
      if (tail.TryAppend(item))
        return;
      spin.SpinOnce();
    }
  }
  /// <summary>
  /// 尝试删除节点
  /// </summary>
  /// <param name="result"></param>
  /// <returns></returns>
  public bool TryDequeue(out T result)
  {
    while (!IsEmpty)
    {
      Segment head = m_head;
      if (head.TryRemove(out result))
        return true;
    }
    result = default(T);
    return false;
  }
  /// <summary>
  /// 查看最后一个添加入的元素
  /// </summary>
  /// <param name="result"></param>
  /// <returns></returns>
  public bool TryPeek(out T result)
  {
    //原子增加值
    Interlocked.Increment(ref m_numSnapshotTakers);

    while (!IsEmpty)
    {
      //首先从头节点看一下第一个节点是否存在
      Segment head = m_head;
      if (head.TryPeek(out result))
      {
        Interlocked.Decrement(ref m_numSnapshotTakers);
        return true;
      }
    }
    result = default(T);
    Interlocked.Decrement(ref m_numSnapshotTakers);
    return false;
  }

  public void CopyTo(Array array, int index)
  {
    throw new NotImplementedException();
  }

  IEnumerator IEnumerable.GetEnumerator()
  {
    throw new NotImplementedException();
  }

  public T[] ToArray()
  {
    throw new NotImplementedException();
  }
  /// <summary>
  /// 为线程安全队列提供一个 单向链表,
  /// 链表的每个节点存储长度为32的数组
  /// </summary>
  private class Segment
  {
    /// <summary>
    /// 定义一个数组,用于存储每个节点的内容
    /// </summary>
    internal volatile T[] m_array;
    /// <summary>
    /// 定义一个结构数组,用于标识数组中每个节点是否有效(是否存储内容)
    /// </summary>
    internal volatile VolatileBool[] m_state;
    //指针,指向下一个节点数组
    //如果是最后一个节点,则节点为空
    private volatile Segment m_next;
    /// <summary>
    /// 索引,用来存储链表的长度
    /// </summary>
    internal readonly long m_index;
    /// <summary>
    /// 用来标识队列头-数组弹出索引
    /// </summary>
    private volatile int m_low;
    /// <summary>
    /// 用来标识队列尾-数组最新存储位置
    /// </summary>
    private volatile int m_high;
    /// <summary>
    /// 用来标识队列
    /// </summary>
    private volatile MyConcurrentQueue<T> m_source;
    /// <summary>
    /// 实例化链节点
    /// </summary>
    internal Segment(long index, MyConcurrentQueue<T> source)
    {
      m_array = new T[SEGMENT_SIZE];
      m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false
      m_high = -1;
      m_index = index;
      m_source = source;
    }
    /// <summary>
    /// 链表的下一个节点
    /// </summary>
    internal Segment Next
    {
      get { return m_next; }
    }
    /// <summary>
    /// 如果当前节点数组为空返回true,
    /// </summary>
    internal bool IsEmpty
    {
      get { return (Low > High); }
    }
    /// <summary>
    /// 非安全添加方法(无判断数组长度)
    /// </summary>
    /// <param name="value"></param>
    internal void UnsafeAdd(T value)
    {
      m_high++;
      m_array[m_high] = value;
      m_state[m_high].m_value = true;
    }

    internal Segment UnsafeGrow()
    {
      Segment newSegment = new Segment(m_index + 1, m_source);
      m_next = newSegment;
      return newSegment;
    }

    /// <summary>
    /// 如果当前数组满了 >=32,则链扩展节点。
    /// </summary>
    internal void Grow()
    {
      //重新船舰数组
      Segment newSegment = new Segment(m_index + 1, m_source);
      //赋值给next指针
      m_next = newSegment;
      //将节点添加到链
      m_source.m_tail = m_next;
    }

    /// <summary>
    /// 在末尾添加元素
    /// </summary>
    /// <param name="value">元素</param>
    /// <param name="tail">The tail.</param>
    /// <returns>如果附加元素,则为true;如果当前数组已满,则为false</returns>
    /// <remarks>如果附加指定的元素成功,并且在此之后数组满了,在链上添加新节点(节点为32长度数组)  </remarks>
    internal bool TryAppend(T value)
    {
      //如果数组已满则跳出方法
      if (m_high >= SEGMENT_SIZE - 1)
      {
        return false;
      }
      //局部变量初始化
      int newhigh = SEGMENT_SIZE;
      try
      { }
      finally
      {
        //原子递增
        newhigh = Interlocked.Increment(ref m_high);
        if (newhigh <= SEGMENT_SIZE - 1)
        {
          m_array[newhigh] = value;
          m_state[newhigh].m_value = true;
        }
        //如果数组满了,则扩展链节点。
        if (newhigh == SEGMENT_SIZE - 1)
        {
          Grow();
        }
      }
      //如果 newhigh <= SEGMENT_SIZE-1, 这意味着当前线程成功地占据了一个位置
      return newhigh <= SEGMENT_SIZE - 1;
    }

    /// <summary>
    /// 尝试从链的头部数组删除节点
    /// </summary>
    /// <param name="result"></param>
    /// <returns></returns>
    internal bool TryRemove(out T result)
    {
      SpinWait spin = new SpinWait();
      int lowLocal = Low, highLocal = High;
      while (lowLocal <= highLocal)
      {
        //获取队头索引
        if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal)
        {
          //如果要弹出队列的值不可用,说明这个位置被并行线程获取到了权限,但是值还未写入。
          //通过线程自旋等待值写入
          SpinWait spinLocal = new SpinWait();
          while (!m_state[lowLocal].m_value)
          {
            spinLocal.SpinOnce();
          }
          //取出值
          result = m_array[lowLocal];
          // 如果没有其他线程读取(GetEnumerator()、ToList()) 执行删除
          // 如 TryPeek 的时候m_numSnapshotTakers会在进入方法体时++,在出方法体--
          // 清空该索引下的值
          if (m_source.m_numSnapshotTakers <= 0)
            m_array[lowLocal] = default(T);
          //如果说lowLocal+1 = 32 说明当前链节点的数组已经全部出队
          if (lowLocal + 1 >= SEGMENT_SIZE)
          {
            //由于lowLocal <= highLocal成立
            //lowLocal + 1 >= SEGMENT_SIZE 如果成立 ,且m_next == null 成立,
            //说明在此时有其他线程正在做扩展链结构
            //那么当前线程需要等待其他线程完成扩展链表,再做出队操作。
            spinLocal = new SpinWait();
            while (m_next == null)
            {
              spinLocal.SpinOnce();
            }
            m_source.m_head = m_next;
          }
          return true;
        }
        else
        {
          //此时说明 当前线程竞争资源失败,做短暂自旋后继续竞争资源
          spin.SpinOnce();
          lowLocal = Low; highLocal = High;
        }
      }
      //失败的情况下返回空值
      result = default(T);
      return false;
    }
    /// <summary>
    /// 尝试获取队列头节点元素
    /// </summary>
    internal bool TryPeek(out T result)
    {
      result = default(T);
      int lowLocal = Low;
      //校验当前队列是否正确
      if (lowLocal > High)
        return false;
      SpinWait spin = new SpinWait();
      //如果头节点无效,则说明当前节点被其他线程占用,并在做写入操作,
      //需要等待其他线程写入后再执行读取操作
      while (!m_state[lowLocal].m_value)
      {
        spin.SpinOnce();
      }
      result = m_array[lowLocal];
      return true;
    }
    /// <summary>
    /// 返回队列首位置
    /// </summary>
    internal int Low
    {
      get
      {
        return Math.Min(m_low, SEGMENT_SIZE);
      }
    }
    /// <summary>
    /// 获取队列长度
    /// </summary>
    internal int High
    {
      get
      {
        //如果m_high>SEGMENT_SIZE,则表示超出范围,我们应该返回 SEGMENT_SIZE-1
        return Math.Min(m_high, SEGMENT_SIZE - 1);
      }
    }
  }
}
/// <summary>
/// 结构-用来存储整数组每个索引上是否存储值
/// </summary>
struct VolatileBool
{
  public VolatileBool(bool value)
  {
    m_value = value;
  }
  public volatile bool m_value;
}

代码通篇看下来有些长(已经精简了很多,只实现入队、出队、与查看下一个出队的值),不知道有多少人能翻到这里~

说明:

1、TryAppend方法通过Interlocked.Increment()原子递增方法获取下一个数组存储点,通过比对32判断链是否需要增加下一个链节点,也就是说,链的存储空间每次扩展为32个存储位置。

2、TryRemove方法通过 Interlocked.CompareExchange()方法来判断当前是否有并行线程在写入,如果有则通过 while循环 SpinWait类的SpinOnce()方法实现等待写入完成后,再做删除;特别说明,判断是否写入是靠VolatileBool结构来实现的,每个链表的每个节点在存储值的同时每个存储都对应一个VolatileBool结构用来标识当前写入点是否成功写入。特殊情况,如果当前链节点的数组已经空了,则需要pinWait类的SpinOnce()简短的自旋等待并行的写入方法完成扩展链后,再做删除。

3、TryPeek方法,同样会判断要获取的元素是否已经成功写入(不成功则说明并行线程还未完成写入),如果未完成,则通过 while pinWait类的SpinOnce()来等待写入完成后,再读取元素内容。

现在代码已经看完了,来试下:

MyConcurrentQueue<string> myConcurrentQueue = new MyConcurrentQueue<string>();
for (int i = 0; i < 67; i++)
{
  myConcurrentQueue.Enqueue($"第{i}位");
  Console.WriteLine($"总数:{myConcurrentQueue.Count}");
}

myConcurrentQueue.TryPeek(out string rs);
Console.WriteLine($"TryPeek 总数:{myConcurrentQueue.Count}");
for (int i = 0; i < 34; i++)
{
  myConcurrentQueue.TryDequeue(out string result0);
  Console.WriteLine($"TryDequeue 总数:{myConcurrentQueue.Count}");
}
Console.ReadKey();

打印:

head.Low:0,tail.High:0,head.m_index:0,tail.m_index:0
总数:1
head.Low:0,tail.High:1,head.m_index:0,tail.m_index:0
总数:2
head.Low:0,tail.High:2,head.m_index:0,tail.m_index:0
总数:3
head.Low:0,tail.High:3,head.m_index:0,tail.m_index:0
总数:4
head.Low:0,tail.High:4,head.m_index:0,tail.m_index:0
总数:5
head.Low:0,tail.High:5,head.m_index:0,tail.m_index:0
总数:6
head.Low:0,tail.High:6,head.m_index:0,tail.m_index:0
总数:7
head.Low:0,tail.High:7,head.m_index:0,tail.m_index:0
总数:8
head.Low:0,tail.High:8,head.m_index:0,tail.m_index:0
总数:9
head.Low:0,tail.High:9,head.m_index:0,tail.m_index:0
总数:10
head.Low:0,tail.High:10,head.m_index:0,tail.m_index:0
总数:11
head.Low:0,tail.High:11,head.m_index:0,tail.m_index:0
总数:12
head.Low:0,tail.High:12,head.m_index:0,tail.m_index:0
总数:13
head.Low:0,tail.High:13,head.m_index:0,tail.m_index:0
总数:14
head.Low:0,tail.High:14,head.m_index:0,tail.m_index:0
总数:15
head.Low:0,tail.High:15,head.m_index:0,tail.m_index:0
总数:16
head.Low:0,tail.High:16,head.m_index:0,tail.m_index:0
总数:17
head.Low:0,tail.High:17,head.m_index:0,tail.m_index:0
总数:18
head.Low:0,tail.High:18,head.m_index:0,tail.m_index:0
总数:19
head.Low:0,tail.High:19,head.m_index:0,tail.m_index:0
总数:20
head.Low:0,tail.High:20,head.m_index:0,tail.m_index:0
总数:21
head.Low:0,tail.High:21,head.m_index:0,tail.m_index:0
总数:22
head.Low:0,tail.High:22,head.m_index:0,tail.m_index:0
总数:23
head.Low:0,tail.High:23,head.m_index:0,tail.m_index:0
总数:24
head.Low:0,tail.High:24,head.m_index:0,tail.m_index:0
总数:25
head.Low:0,tail.High:25,head.m_index:0,tail.m_index:0
总数:26
head.Low:0,tail.High:26,head.m_index:0,tail.m_index:0
总数:27
head.Low:0,tail.High:27,head.m_index:0,tail.m_index:0
总数:28
head.Low:0,tail.High:28,head.m_index:0,tail.m_index:0
总数:29
head.Low:0,tail.High:29,head.m_index:0,tail.m_index:0
总数:30
head.Low:0,tail.High:30,head.m_index:0,tail.m_index:0
总数:31
head.Low:0,tail.High:-1,head.m_index:0,tail.m_index:1
总数:32
head.Low:0,tail.High:0,head.m_index:0,tail.m_index:1
总数:33
head.Low:0,tail.High:1,head.m_index:0,tail.m_index:1
总数:34
head.Low:0,tail.High:2,head.m_index:0,tail.m_index:1
总数:35
head.Low:0,tail.High:3,head.m_index:0,tail.m_index:1
总数:36
head.Low:0,tail.High:4,head.m_index:0,tail.m_index:1
总数:37
head.Low:0,tail.High:5,head.m_index:0,tail.m_index:1
总数:38
head.Low:0,tail.High:6,head.m_index:0,tail.m_index:1
总数:39
head.Low:0,tail.High:7,head.m_index:0,tail.m_index:1
总数:40
head.Low:0,tail.High:8,head.m_index:0,tail.m_index:1
总数:41
head.Low:0,tail.High:9,head.m_index:0,tail.m_index:1
总数:42
head.Low:0,tail.High:10,head.m_index:0,tail.m_index:1
总数:43
head.Low:0,tail.High:11,head.m_index:0,tail.m_index:1
总数:44
head.Low:0,tail.High:12,head.m_index:0,tail.m_index:1
总数:45
head.Low:0,tail.High:13,head.m_index:0,tail.m_index:1
总数:46
head.Low:0,tail.High:14,head.m_index:0,tail.m_index:1
总数:47
head.Low:0,tail.High:15,head.m_index:0,tail.m_index:1
总数:48
head.Low:0,tail.High:16,head.m_index:0,tail.m_index:1
总数:49
head.Low:0,tail.High:17,head.m_index:0,tail.m_index:1
总数:50
head.Low:0,tail.High:18,head.m_index:0,tail.m_index:1
总数:51
head.Low:0,tail.High:19,head.m_index:0,tail.m_index:1
总数:52
head.Low:0,tail.High:20,head.m_index:0,tail.m_index:1
总数:53
head.Low:0,tail.High:21,head.m_index:0,tail.m_index:1
总数:54
head.Low:0,tail.High:22,head.m_index:0,tail.m_index:1
总数:55
head.Low:0,tail.High:23,head.m_index:0,tail.m_index:1
总数:56
head.Low:0,tail.High:24,head.m_index:0,tail.m_index:1
总数:57
head.Low:0,tail.High:25,head.m_index:0,tail.m_index:1
总数:58
head.Low:0,tail.High:26,head.m_index:0,tail.m_index:1
总数:59
head.Low:0,tail.High:27,head.m_index:0,tail.m_index:1
总数:60
head.Low:0,tail.High:28,head.m_index:0,tail.m_index:1
总数:61
head.Low:0,tail.High:29,head.m_index:0,tail.m_index:1
总数:62
head.Low:0,tail.High:30,head.m_index:0,tail.m_index:1
总数:63
head.Low:0,tail.High:-1,head.m_index:0,tail.m_index:2
总数:64
head.Low:0,tail.High:0,head.m_index:0,tail.m_index:2
总数:65
head.Low:0,tail.High:1,head.m_index:0,tail.m_index:2
总数:66
head.Low:0,tail.High:2,head.m_index:0,tail.m_index:2
总数:67
head.Low:0,tail.High:2,head.m_index:0,tail.m_index:2
TryPeek 总数:67
head.Low:1,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:66
head.Low:2,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:65
head.Low:3,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:64
head.Low:4,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:63
head.Low:5,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:62
head.Low:6,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:61
head.Low:7,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:60
head.Low:8,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:59
head.Low:9,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:58
head.Low:10,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:57
head.Low:11,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:56
head.Low:12,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:55
head.Low:13,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:54
head.Low:14,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:53
head.Low:15,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:52
head.Low:16,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:51
head.Low:17,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:50
head.Low:18,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:49
head.Low:19,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:48
head.Low:20,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:47
head.Low:21,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:46
head.Low:22,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:45
head.Low:23,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:44
head.Low:24,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:43
head.Low:25,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:42
head.Low:26,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:41
head.Low:27,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:40
head.Low:28,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:39
head.Low:29,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:38
head.Low:30,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:37
head.Low:31,tail.High:2,head.m_index:0,tail.m_index:2
TryDequeue 总数:36
head.Low:0,tail.High:2,head.m_index:1,tail.m_index:2
TryDequeue 总数:35
head.Low:1,tail.High:2,head.m_index:1,tail.m_index:2
TryDequeue 总数:34
head.Low:2,tail.High:2,head.m_index:1,tail.m_index:2
TryDequeue 总数:33

有时间希望大家能将代码跑一下,相信会更明白其中的原理。

以上就是c# 线程安全队列的用法原理及使用示例的详细内容,更多关于c# 线程安全队列的资料请关注我们其它相关文章!

(0)

相关推荐

  • 浅谈C#跨线程调用窗体控件(比如TextBox)引发的线程安全问题

    如何:对 Windows 窗体控件进行线程安全调用 访问 Windows 窗体控件本质上不是线程安全的. 如果有两个或多个线程操作某一控件的状态,则可能会迫使该控件进入一种不一致的状态. 还可能会出现其他与线程相关的 Bug,例如争用情况和死锁. 确保以线程安全方式访问控件非常重要. 在未使用 Invoke 方法的情况下,从不是创建某个控件的线程的其他线程调用该控件是不安全的. 以下非线程安全的调用的示例. // This event handler creates a thread that

  • C#向线程中传递多个参数的解决方法(两种)

    问题: 对于多线程编程,很多时候往往需要向线程中传递多个参数,而C#中的线程只接收1个object类型的参数(如下): Thread t = new Thread(new ParameterizedThreadStart(newthread)); t.start(参数); void newthread(object) { ............. } 而现在需要往线程中传递多个参数,比如method方法想用单独的线程来跑 void method(int begin,int end) { ...

  • c# winform 关闭窗体时同时结束线程实现思路

    前不久,做一个winform小程序,是给客户导数据用的.当时就发现调试的时候,线程有点问题,到某个点时就走不动了.但是运行确实没有问题的. 只是在关闭窗体后,资源管理器里,一大堆进程. 当时,客户急着用,小测了下,导出数据无误,进程就先不管了. 后来自己去查资料,发现只要在线程那里设置个属性 复制代码 代码如下: Thread th = new Thread(Excute); th.IsBackground = true;这样就解决问题了. 这个属性的意思就是把线程设置为后台线程. 然后关闭进程

  • C#实现终止正在执行的线程

    本文实例讲述了C#实现终止正在执行的线程的实现方法,并针对一些容易出错的地方进行了深入分析,具体方法如下: 一般来说,很多人都会使用Abort方法来终止线程,其实这种做法并不可取!如果你的线程正在操作临界资源,很有可能会造成资源没有正确释放而出现死锁问题.正确的做法应该是使用标记来终止线程的执行. 基本思路是:定义一个用于描述"停止"信号的变量,在整个程序启动前,设置该变量为false.在线程中,循环判断该变量是否已经被设置为true,如果没有,则继续执行,否则就退出循环并释放资源,然

  • C#创建线程带参数的方法

    1.无参数线程的创建 Thread thread = new Thread(new ThreadStart(getpic)); thread.Start(); private void showmessage() { Console.WriteLine("hello world"); } 2.带一个参数的线程 使用ParameterizedThreadStart,调用 System.Threading.Thread.Start(System.Object) 重载方法时将包含数据的对象传

  • C#队列Queue多线程用法实例

    本文实例讲述了C#队列Queue多线程用法.分享给大家供大家参考.具体分析如下: 这里展示一个例子,供学习使用: private void button_测试Queue结合多线程_Click(object sender, EventArgs e) { Console.WriteLine("初始化队列"); queue = new Queue<string>(); string[] cars = new string[]{"宝马","奔驰&quo

  • C#线程 BeginInvoke和EndInvoke使用方法

    开发语言:C#3.0 IDE:Visual Studio 2008 一.C#线程概述 在操作系统中一个进程至少要包含一个线程,然后,在某些时候需要在同一个进程中同时执行多项任务,或是为了提供程序的性能,将要执行的任务分解成多个子任务执行.这就需要在同一个进程中开启多个线程.我们使用C#编写一个应用程序(控制台或桌面程序都可以),然后运行这个程序,并打开windows任务管理器,这时我们就会看到这个应用程序中所含有的线程数,如下图所示. 如果任务管理器没有"线程数"列,可以[查看]>

  • C#实现多线程写入同一个文件的方法

    本文实例讲述了C#实现多线程写入同一个文件的方法.分享给大家供大家参考.具体实现方法如下: namespace WfpApp { public partial class Form2 : Form { object obj = new object(); public Form2() { InitializeComponent(); System.Threading.Thread thread; string[] users = new string[] { "zkk", "

  • c# winform多线程的小例子

    在文本框中输入一个数字,点击开始累加按钮,程序计算从1开始累计到该数字的结果.因为该累加过程比较耗时,如果直接在UI线程中进行,那么当前窗口将出现假死.为了有更好的用户体验,程序启动一个新的线程来单独执行该计算,然后每隔200毫秒读取一次累加结果,并把结果显示到文本框下方的label控件中.同时,程序支持取消操作,点击取消累计按钮,程序将取消累加操作,并把当前累加值显示到label中.为了方便后面的描述,我把UI线程称作主线程,把执行累加计算的线程称作工作者线程.该过程有两个关键点: 1:如何在

  • c#使用多线程的几种方式示例详解

    (1)不需要传递参数,也不需要返回参数 ThreadStart是一个委托,这个委托的定义为void ThreadStart(),没有参数与返回值. 复制代码 代码如下: class Program { static void Main(string[] args) { for (int i = 0; i < 30; i++) { ThreadStart threadStart = new ThreadStart(Calculate); Thread thread = new Thread(thr

  • C#实现线程安全的简易日志记录方法

    一般在实际项目的开发中,会要求涉及日志记录的问题,比较常用的有Log4Net,NLog等几个,而小项目小工具的话,则无需费此大驾.而譬如串口开发的话,需要记录串口过来的数据等等,这时候就要考虑日志记录上线程的问题.对此,为了方便后续使用,封装了下代码: using System; using System.Diagnostics; using System.IO; using System.Text; using System.Threading; namespace CSharpUtilHel

随机推荐