深入线程安全容器的实现方法

最近写了个小程序用到了C#4.0中的线程安全集合。想起很久以前用C#2.0开发的时候写后台windows服务,为了利用多线程实现生产者和消费者模型,经常要封装一些线程安全的容器,比如泛型队列和字典等等。下面就结合部分MS的源码和自己的开发经验浅显地分析一下如何实现线程安全容器以及实现线程安全容器容易产生的问题。

一、ArrayList

在C#早期版本中已经实现了线程安全的ArrayList,可以通过下面的方式构造线程安全的数组列表:

var array = ArrayList.Synchronized(new ArrayList());

我们从Synchronized方法入手,分析它的源代码看是如何实现线程安全的:


代码如下:

Synchronized        /// <summary>Returns an <see cref="T:System.Collections.ArrayList" /> wrapper that is synchronized (thread safe).</summary>
        /// <returns>An <see cref="T:System.Collections.ArrayList" /> wrapper that is synchronized (thread safe).</returns>
        /// <param name="list">The <see cref="T:System.Collections.ArrayList" /> to synchronize. </param>
        /// <exception cref="T:System.ArgumentNullException">
        ///   <paramref name="list" /> is null. </exception>
        /// <filterpriority>2</filterpriority>
        [HostProtection(SecurityAction.LinkDemand, Synchronization = true)]
        public static ArrayList Synchronized(ArrayList list)
        {
            if (list == null)
            {
                throw new ArgumentNullException("list");
            }
            return new ArrayList.SyncArrayList(list);
        }

继续跟进去,发现SyncArrayList是一个继承自ArrayList的私有类,内部线程安全方法的实现经过分析,很多都是像下面这样lock(注意是lock_root对象而不是数组列表实例对象)一下完事:

lock (this._root)

有心的你可以查看SyncArrayList的源码:


代码如下:

SyncArrayList        [Serializable]
        private class SyncArrayList : ArrayList
        {
            private ArrayList _list;
            private object _root;
            public override int Capacity
            {
                get
                {
                    int capacity;
                    lock (this._root)
                    {
                        capacity = this._list.Capacity;
                    }
                    return capacity;
                }
                set
                {
                    lock (this._root)
                    {
                        this._list.Capacity = value;
                    }
                }
            }
            public override int Count
            {
                get
                {
                    int count;
                    lock (this._root)
                    {
                        count = this._list.Count;
                    }
                    return count;
                }
            }
            public override bool IsReadOnly
            {
                get
                {
                    return this._list.IsReadOnly;
                }
            }
            public override bool IsFixedSize
            {
                get
                {
                    return this._list.IsFixedSize;
                }
            }
            public override bool IsSynchronized
            {
                get
                {
                    return true;
                }
            }
            public override object this[int index]
            {
                get
                {
                    object result;
                    lock (this._root)
                    {
                        result = this._list[index];
                    }
                    return result;
                }
                set
                {
                    lock (this._root)
                    {
                        this._list[index] = value;
                    }
                }
            }
            public override object SyncRoot
            {
                get
                {
                    return this._root;
                }
            }
            internal SyncArrayList(ArrayList list)
                : base(false)
            {
                this._list = list;
                this._root = list.SyncRoot;
            }
            public override int Add(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.Add(value);
                }
                return result;
            }
            public override void AddRange(ICollection c)
            {
                lock (this._root)
                {
                    this._list.AddRange(c);
                }
            }
            public override int BinarySearch(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.BinarySearch(value);
                }
                return result;
            }
            public override int BinarySearch(object value, IComparer comparer)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.BinarySearch(value, comparer);
                }
                return result;
            }
            public override int BinarySearch(int index, int count, object value, IComparer comparer)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.BinarySearch(index, count, value, comparer);
                }
                return result;
            }
            public override void Clear()
            {
                lock (this._root)
                {
                    this._list.Clear();
                }
            }
            public override object Clone()
            {
                object result;
                lock (this._root)
                {
                    result = new ArrayList.SyncArrayList((ArrayList)this._list.Clone());
                }
                return result;
            }
            public override bool Contains(object item)
            {
                bool result;
                lock (this._root)
                {
                    result = this._list.Contains(item);
                }
                return result;
            }
            public override void CopyTo(Array array)
            {
                lock (this._root)
                {
                    this._list.CopyTo(array);
                }
            }
            public override void CopyTo(Array array, int index)
            {
                lock (this._root)
                {
                    this._list.CopyTo(array, index);
                }
            }
            public override void CopyTo(int index, Array array, int arrayIndex, int count)
            {
                lock (this._root)
                {
                    this._list.CopyTo(index, array, arrayIndex, count);
                }
            }
            public override IEnumerator GetEnumerator()
            {
                IEnumerator enumerator;
                lock (this._root)
                {
                    enumerator = this._list.GetEnumerator();
                }
                return enumerator;
            }
            public override IEnumerator GetEnumerator(int index, int count)
            {
                IEnumerator enumerator;
                lock (this._root)
                {
                    enumerator = this._list.GetEnumerator(index, count);
                }
                return enumerator;
            }
            public override int IndexOf(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.IndexOf(value);
                }
                return result;
            }
            public override int IndexOf(object value, int startIndex)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.IndexOf(value, startIndex);
                }
                return result;
            }
            public override int IndexOf(object value, int startIndex, int count)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.IndexOf(value, startIndex, count);
                }
                return result;
            }
            public override void Insert(int index, object value)
            {
                lock (this._root)
                {
                    this._list.Insert(index, value);
                }
            }
            public override void InsertRange(int index, ICollection c)
            {
                lock (this._root)
                {
                    this._list.InsertRange(index, c);
                }
            }
            public override int LastIndexOf(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.LastIndexOf(value);
                }
                return result;
            }
            public override int LastIndexOf(object value, int startIndex)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.LastIndexOf(value, startIndex);
                }
                return result;
            }
            public override int LastIndexOf(object value, int startIndex, int count)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.LastIndexOf(value, startIndex, count);
                }
                return result;
            }
            public override void Remove(object value)
            {
                lock (this._root)
                {
                    this._list.Remove(value);
                }
            }
            public override void RemoveAt(int index)
            {
                lock (this._root)
                {
                    this._list.RemoveAt(index);
                }
            }
            public override void RemoveRange(int index, int count)
            {
                lock (this._root)
                {
                    this._list.RemoveRange(index, count);
                }
            }
            public override void Reverse(int index, int count)
            {
                lock (this._root)
                {
                    this._list.Reverse(index, count);
                }
            }
            public override void SetRange(int index, ICollection c)
            {
                lock (this._root)
                {
                    this._list.SetRange(index, c);
                }
            }
            public override ArrayList GetRange(int index, int count)
            {
                ArrayList range;
                lock (this._root)
                {
                    range = this._list.GetRange(index, count);
                }
                return range;
            }
            public override void Sort()
            {
                lock (this._root)
                {
                    this._list.Sort();
                }
            }
            public override void Sort(IComparer comparer)
            {
                lock (this._root)
                {
                    this._list.Sort(comparer);
                }
            }
            public override void Sort(int index, int count, IComparer comparer)
            {
                lock (this._root)
                {
                    this._list.Sort(index, count, comparer);
                }
            }
            public override object[] ToArray()
            {
                object[] result;
                lock (this._root)
                {
                    result = this._list.ToArray();
                }
                return result;
            }
            public override Array ToArray(Type type)
            {
                Array result;
                lock (this._root)
                {
                    result = this._list.ToArray(type);
                }
                return result;
            }
            public override void TrimToSize()
            {
                lock (this._root)
                {
                    this._list.TrimToSize();
                }
            }
        }

ArrayList线程安全实现过程小结:定义ArrayList的私有实现SyncArrayList,子类内部通过lock同步构造实现线程安全,在ArrayList中通过Synchronized对外间接调用子类。

二、Hashtable

同样,在C#早期版本中实现了线程安全的Hashtable,它也是早期开发中经常用到的缓存容器,可以通过下面的方式构造线程安全的哈希表:

var ht = Hashtable.Synchronized(new Hashtable());

同样地,我们从Synchronized方法入手,分析它的源代码看是如何实现线程安全的:


代码如下:

Synchronized        /// <summary>Returns a synchronized (thread-safe) wrapper for the <see cref="T:System.Collections.Hashtable" />.</summary>
        /// <returns>A synchronized (thread-safe) wrapper for the <see cref="T:System.Collections.Hashtable" />.</returns>
        /// <param name="table">The <see cref="T:System.Collections.Hashtable" /> to synchronize. </param>
        /// <exception cref="T:System.ArgumentNullException">
        ///   <paramref name="table" /> is null. </exception>
        /// <filterpriority>1</filterpriority>
        [HostProtection(SecurityAction.LinkDemand, Synchronization = true)]
        public static Hashtable Synchronized(Hashtable table)
        {
            if (table == null)
            {
                throw new ArgumentNullException("table");
            }
            return new Hashtable.SyncHashtable(table);
        }

继续跟进去,发现SyncHashtable是一个继承自Hashtable和IEnumerable接口的私有类,内部线程安全方法的实现经过分析,很多都是像下面这样lock(注意是lock哈希表的SyncRoot Object实例对象而不是哈希表实例)一下完事:

lock (this._table.SyncRoot)

贴一下SyncHashtable的源码:


代码如下:

[Serializable]
        private class SyncHashtable : Hashtable, IEnumerable
        {
            protected Hashtable _table;
            public override int Count
            {
                get
                {
                    return this._table.Count;
                }
            }
            public override bool IsReadOnly
            {
                get
                {
                    return this._table.IsReadOnly;
                }
            }
            public override bool IsFixedSize
            {
                get
                {
                    return this._table.IsFixedSize;
                }
            }
            public override bool IsSynchronized
            {
                get
                {
                    return true;
                }
            }
            public override object this[object key]
            {
                [TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
                get
                {
                    return this._table[key];
                }
                set
                {
                    lock (this._table.SyncRoot)
                    {
                        this._table[key] = value;
                    }
                }
            }
            public override object SyncRoot
            {
                get
                {
                    return this._table.SyncRoot;
                }
            }
            public override ICollection Keys
            {
                get
                {
                    ICollection keys;
                    lock (this._table.SyncRoot)
                    {
                        keys = this._table.Keys;
                    }
                    return keys;
                }
            }
            public override ICollection Values
            {
                get
                {
                    ICollection values;
                    lock (this._table.SyncRoot)
                    {
                        values = this._table.Values;
                    }
                    return values;
                }
            }
            internal SyncHashtable(Hashtable table)
                : base(false)
            {
                this._table = table;
            }
            internal SyncHashtable(SerializationInfo info, StreamingContext context)
                : base(info, context)
            {
                this._table = (Hashtable)info.GetValue("ParentTable", typeof(Hashtable));
                if (this._table == null)
                {
                    throw new SerializationException(Environment.GetResourceString("Serialization_InsufficientState"));
                }
            }
            [SecurityCritical]
            public override void GetObjectData(SerializationInfo info, StreamingContext context)
            {
                if (info == null)
                {
                    throw new ArgumentNullException("info");
                }
                lock (this._table.SyncRoot)
                {
                    info.AddValue("ParentTable", this._table, typeof(Hashtable));
                }
            }
            public override void Add(object key, object value)
            {
                lock (this._table.SyncRoot)
                {
                    this._table.Add(key, value);
                }
            }
            public override void Clear()
            {
                lock (this._table.SyncRoot)
                {
                    this._table.Clear();
                }
            }
            [TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
            public override bool Contains(object key)
            {
                return this._table.Contains(key);
            }
            [TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
            public override bool ContainsKey(object key)
            {
                return this._table.ContainsKey(key);
            }
            public override bool ContainsValue(object key)
            {
                bool result;
                lock (this._table.SyncRoot)
                {
                    result = this._table.ContainsValue(key);
                }
                return result;
            }
            public override void CopyTo(Array array, int arrayIndex)
            {
                lock (this._table.SyncRoot)
                {
                    this._table.CopyTo(array, arrayIndex);
                }
            }
            public override object Clone()
            {
                object result;
                lock (this._table.SyncRoot)
                {
                    result = Hashtable.Synchronized((Hashtable)this._table.Clone());
                }
                return result;
            }
            IEnumerator IEnumerable.GetEnumerator()
            {
                return this._table.GetEnumerator();
            }
            public override IDictionaryEnumerator GetEnumerator()
            {
                return this._table.GetEnumerator();
            }
            public override void Remove(object key)
            {
                lock (this._table.SyncRoot)
                {
                    this._table.Remove(key);
                }
            }
            public override void OnDeserialization(object sender)
            {
            }
            internal override KeyValuePairs[] ToKeyValuePairsArray()
            {
                return this._table.ToKeyValuePairsArray();
            }
        }

Hashtable线程安全实现过程小结:定义Hashtable的私有实现SyncHashtable,子类内部通过lock同步构造实现线程安全,在Hashtable中通过Synchronized对外间接调用子类。

三、4.0中的线程安全容器


1、ConcurrentQueue

从上面的实现分析来说,封装一个线程安全的容器看起来并不是什么难事,除了对线程安全容器的异常处理心有余悸,其他的似乎按步就班就可以了,不是吗?也许还有更高明的实现吧?

在4.0中,多了一个System.Collections.Concurrent命名空间,怀着忐忑的心情查看C#4.0其中的一个线程安全集合ConcurrentQueue的源码,发现它继承自IProducerConsumerCollection<T>, IEnumerable<T>, ICollection, IEnumerable接口,内部实现线程安全的时候,通过SpinWait和通过互锁构造(Interlocked)及SpinWait封装的Segment,间接实现了线程安全。Segment的实现比较复杂,和线程安全密切相关的方法就是TryXXX那几个方法,源码如下:


代码如下:

private class Segment
        {
            internal T[] m_array;
            private int[] m_state;
            private ConcurrentQueue<T>.Segment m_next;
            internal readonly long m_index;
            private int m_low;
            private int m_high;
            internal ConcurrentQueue<T>.Segment Next
            {
                get
                {
                    return this.m_next;
                }
            }
            internal bool IsEmpty
            {
                get
                {
                    return this.Low > this.High;
                }
            }
            internal int Low
            {
                get
                {
                    return Math.Min(this.m_low, 32);
                }
            }
            internal int High
            {
                get
                {
                    return Math.Min(this.m_high, 31);
                }
            }
            internal Segment(long index)
            {
                this.m_array = new T[32];
                this.m_state = new int[32];
                this.m_high = -1;
                this.m_index = index;
            }
            internal void UnsafeAdd(T value)
            {
                this.m_high++;
                this.m_array[this.m_high] = value;
                this.m_state[this.m_high] = 1;
            }
            internal ConcurrentQueue<T>.Segment UnsafeGrow()
            {
                ConcurrentQueue<T>.Segment segment = new ConcurrentQueue<T>.Segment(this.m_index + 1L);
                this.m_next = segment;
                return segment;
            }
            internal void Grow(ref ConcurrentQueue<T>.Segment tail)
            {
                ConcurrentQueue<T>.Segment next = new ConcurrentQueue<T>.Segment(this.m_index + 1L);
                this.m_next = next;
                tail = this.m_next;
            }
            internal bool TryAppend(T value, ref ConcurrentQueue<T>.Segment tail)
            {
                if (this.m_high >= 31)
                {
                    return false;
                }
                int num = 32;
                try
                {
                }
                finally
                {
                    num = Interlocked.Increment(ref this.m_high);
                    if (num <= 31)
                    {
                        this.m_array[num] = value;
                        this.m_state[num] = 1;
                    }
                    if (num == 31)
                    {
                        this.Grow(ref tail);
                    }
                }
                return num <= 31;
            }
            internal bool TryRemove(out T result, ref ConcurrentQueue<T>.Segment head)
            {
                SpinWait spinWait = default(SpinWait);
                int i = this.Low;
                int high = this.High;
                while (i <= high)
                {
                    if (Interlocked.CompareExchange(ref this.m_low, i + 1, i) == i)
                    {
                        SpinWait spinWait2 = default(SpinWait);
                        while (this.m_state[i] == 0)
                        {
                            spinWait2.SpinOnce();
                        }
                        result = this.m_array[i];
                        if (i + 1 >= 32)
                        {
                            spinWait2 = default(SpinWait);
                            while (this.m_next == null)
                            {
                                spinWait2.SpinOnce();
                            }
                            head = this.m_next;
                        }
                        return true;
                    }
                    spinWait.SpinOnce();
                    i = this.Low;
                    high = this.High;
                }
                result = default(T);
                return false;
            }
            internal bool TryPeek(out T result)
            {
                result = default(T);
                int low = this.Low;
                if (low > this.High)
                {
                    return false;
                }
                SpinWait spinWait = default(SpinWait);
                while (this.m_state[low] == 0)
                {
                    spinWait.SpinOnce();
                }
                result = this.m_array[low];
                return true;
            }
            internal List<T> ToList(int start, int end)
            {
                List<T> list = new List<T>();
                for (int i = start; i <= end; i++)
                {
                    SpinWait spinWait = default(SpinWait);
                    while (this.m_state[i] == 0)
                    {
                        spinWait.SpinOnce();
                    }
                    list.Add(this.m_array[i]);
                }
                return list;
            }
        }

上面的代码稍微分析一下就知道它的作用。ConcurrentQueue的线程安全的Enqueue方法实现如下:


代码如下:

/// <summary>Adds an object to the end of the <see cref="T:System.Collections.Concurrent.ConcurrentQueue`1" />.</summary>
        /// <param name="item">The object to add to the end of the <see cref="T:System.Collections.Concurrent.ConcurrentQueue`1" />. The value can be a null reference (Nothing in Visual Basic) for reference types.</param>
        public void Enqueue(T item)
        {
            SpinWait spinWait = default(SpinWait);
            while (true)
            {
                ConcurrentQueue<T>.Segment tail = this.m_tail;
                if (tail.TryAppend(item, ref this.m_tail))
                {
                    break;
                }
                spinWait.SpinOnce();
            }
        }

ConcurrentQueue<T>线程安全实现过程小结:继承接口,子类内部通过同步构造实现接口的线程安全,直接对外公开调用。

和ArrayList以及Hashtable线程安全的“曲折”实现有点不同,ConcurrentQueue<T>一开始就是朝着线程安全方向实现去的。它没有使用lock,因为大家知道使用lock性能略差,对于读和写操作,应该分开,不能一概而论。ConcurrentQueue<T>具体实现在性能和异常处理上应该已经考虑的更全面周到一点。

在我看来,ConcurrentQueue<T>线程安全的具体实现有多吸引人在其次,IProducerConsumerCollection<T>接口的抽象和提取非常值得称道,查看源码发现ConcurrentStack<T>和ConcurrentBag<T>也继承自该接口。<<CLR via C#>>一书中在谈到接口和抽象类的时候特别举了集合和流(Stream)的例子,微软为什么如此设计,想起来果然很有深意。


代码如下:

public bool TryAdd(TKey key, TValue value)
        {
            if (key == null)
            {
                throw new ArgumentNullException("key");
            }
            TValue tValue;
            return this.TryAddInternal(key, value, false, true, out tValue);
        }

其中内部方法TryAdd的主要实现如下:


代码如下:

private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
        {
            checked
            {
                int hashCode = this.m_comparer.GetHashCode(key);
                ConcurrentDictionary<TKey, TValue>.Node[] buckets;
                bool flag;
                bool result;
                while (true)
                {
                    buckets = this.m_buckets;
                    int num;
                    int num2;
                    this.GetBucketAndLockNo(hashCode, out num, out num2, buckets.Length);
                    flag = false;
                    bool flag2 = false;
                    try
                    {
                        if (acquireLock)
                        {
                            Monitor.Enter(this.m_locks[num2], ref flag2);
                        }
                        if (buckets != this.m_buckets)
                        {
                            continue;
                        }
                        ConcurrentDictionary<TKey, TValue>.Node node = null;
                        for (ConcurrentDictionary<TKey, TValue>.Node node2 = buckets[num]; node2 != null; node2 = node2.m_next)
                        {
                            if (this.m_comparer.Equals(node2.m_key, key))
                            {
                                if (updateIfExists)
                                {
                                    ConcurrentDictionary<TKey, TValue>.Node node3 = new ConcurrentDictionary<TKey, TValue>.Node(node2.m_key, value, hashCode, node2.m_next);
                                    if (node == null)
                                    {
                                        buckets[num] = node3;
                                    }
                                    else
                                    {
                                        node.m_next = node3;
                                    }
                                    resultingValue = value;
                                }
                                else
                                {
                                    resultingValue = node2.m_value;
                                }
                                result = false;
                                return result;
                            }
                            node = node2;
                        }
                        buckets[num] = new ConcurrentDictionary<TKey, TValue>.Node(key, value, hashCode, buckets[num]);
                        this.m_countPerLock[num2]++;
                        if (this.m_countPerLock[num2] > buckets.Length / this.m_locks.Length)
                        {
                            flag = true;
                        }
                    }
                    finally
                    {
                        if (flag2)
                        {
                            Monitor.Exit(this.m_locks[num2]);
                        }
                    }
                    break;
                }
                if (flag)
                {
                    this.GrowTable(buckets);
                    goto IL_131;
                }
                goto IL_131;
                return result;
            IL_131:
                resultingValue = value;
                return true;
            }
        }

同步构造Monitor瞬间吸引眼球,然后它的try…finally异常处理方式是不是也很眼熟?

2、ConcurrentDictionary<TKey, TValue>


对于线程安全的泛型字典ConcurrentDictionary<TKey, TValue>,我们也可以查看它的源码看它的具体实现方式。看源码有1200多行,实现稍微复杂一些。我们仅从最简单的TryAdd方法分析:

四、如法炮制

如果让我来构造实现线程安全容器,最简单直接快速高效的方式就是参考ArrayList和 Hashtable,我们完全可以模仿它们的处理方式,通过继承一个容器,然后内部通过lock一个SyncRoot对象,中规中矩地实现framework中其他容器的线程安全。比如要实现线程安全的泛型队列Queue<T>,贴一下大致的伪代码:


代码如下:

private class SyncQueue<T> : Queue<T>
    {
        #region fields and properties

private Queue<T> queue = null;
        private object syncRoot = null;
        internal object SyncRoot
        {
            get
            {
                return syncRoot;
            }
        }

#endregion

#region constructors

public SyncQueue()
        {
            syncRoot = new object();
            queue = new Queue<T>();
        }

public SyncQueue(IEnumerable<T> collection)
        {
            syncRoot = new object();
            queue = new Queue<T>(collection);
        }

public SyncQueue(int capacity)
        {
            syncRoot = new object();
            queue = new Queue<T>(capacity);
        }

#endregion

#region methods

public new void Enqueue(T item)
        {
            lock (SyncRoot)
            {
                this.Enqueue(item);
            }
        }

public new T Dequeue()
        {
            T result = default(T);
            lock (SyncRoot)
            {
                result = this.queue.Dequeue();
            }
            return result;
        }

public new void Clear()
        {
            lock (SyncRoot)
            {
                this.queue.Clear();
            }
        }

public new bool Contains(T item)
        {
            var exists = false;
            lock (SyncRoot)
            {
                exists = this.queue.Contains(item);
            }
            return exists;
        }

#endregion

}

通过类继承我们可以得到泛型队列的所有特点,需要实现线程安全的地方只要按需重写它即可,对外调用也很简单,直接模仿ArrayList和Hashtable,添加Synchronized方法间接调用队列的子类即可,多么清晰简洁啊,关键时刻copy-paste也很有效嘛!

你可能觉得上面这样不动脑的方式似乎很傻很天真,但这绝对是一种正常人都能想到的思路,谁让MS的数组列表和哈希表就是这么实现的呢?

当然,我们还能想到的一种常见实现方式就是通过组合而不是类继承,实现的伪代码类似下面这样:


代码如下:

public class SyncQueue<T>
    {
        #region fields and properties

private Queue<T> queue = null;
        private object syncRoot = null;
        internal object SyncRoot
        {
            get
            {
                return syncRoot;
            }
        }

#endregion

#region constructors

public SyncQueue()
        {
            syncRoot = new object();
            queue = new Queue<T>();
        }

public SyncQueue(IEnumerable<T> collection)
        {
            syncRoot = new object();
            queue = new Queue<T>(collection);
        }

public SyncQueue(int capacity)
        {
            syncRoot = new object();
            queue = new Queue<T>(capacity);
        }

#endregion

#region methods

public void Enqueue(T item)
        {
            lock (SyncRoot)
            {
                this.Enqueue(item);
            }
        }

public T Dequeue()
        {
            T result = default(T);
            lock (SyncRoot)
            {
                result = this.queue.Dequeue();
            }
            return result;
        }

public void Clear()
        {
            lock (SyncRoot)
            {
                this.queue.Clear();
            }
        }

public bool Contains(T item)
        {
            var exists = false;
            lock (SyncRoot)
            {
                exists = this.queue.Contains(item);
            }
            return exists;
        }

#endregion

}

上面这种方式和类继承的实现方式又有不同。它是通过在内部包装一个容器,然后按需进行方法、属性等等的线程安全处理,其他的所有特点依赖于那一个私有泛型队列组合对象queue。这种情况下泛型SyncQueue和泛型队列是组合关系,依赖性和耦合性更低,相对更灵活,封装性更好,是一种较为通用的设计,实际开发和使用中这种方式比较常见。

到这里,我们至少可以分析得出,实现一般的线程安全容器的思路至少有两种:类继承(内部实现偏向使用组合)和(或)组合,线程安全的地方只要通过framework的同步构造如lock、Interlocked等实现即可。

思考:如果让您实现线程安全容器,您优先会怎么实现呢?

五、线程安全并不真正安全


1、foreach遍历

CacheUtil缓存实现的伪代码如下:


代码如下:

public class CacheUtil
    {
        private static readonly Hashtable ht = Hashtable.Synchronized(new Hashtable());

public static bool TryAdd(object key, object value)
        {
            ht[key] = value; //set方法是线程安全的
            return true;
        }

public static bool TryGet(object key, out object result)
        {
            result = null;
            lock (ht.SyncRoot)
            {
                if (ht.ContainsKey(key))
                {
                    result = ht[key];
                }
            }
            return true;
        }
    }

foreach的代码很简单,从哈希表构造的缓存中取数据并遍历,如下:


代码如下:

object obj = null;
            var isOK = CacheUtil.TryGet("key", out obj);
            if (isOK == false)
            {
                return;
            }
            var list = obj as IList<T>;
            if (list == null)
            {
                return;
            }
            foreach (var item in list) //遍历
            {
                //do something
            }

上面的遍历代码一般情况下是不会有问题的。但是在多线程修改哈希表的Value的情况下,上面的foreach遍历有可能发生异常。为什么呢?下面来简单分析一下:

从代码中可以看出来,哈希表中的Value存放的是IList类型,那么值所保存的应该是一个引用(也就是指针)。
(1)、当线程1通过索引器得到这个IList时,这个TryGet读取操作是线程安全的。接着线程1进行的操作是列表遍历。在foreach进行遍历不为空的List的时候,遍历的其实是存放在IList指针指向的引用。

(2)、在foreach遍历集合的时候,这时候线程2如果正好对哈希表的key所对应的Value进行修改,IList的指针所指向的引用改变了,所以线程1的遍历操作就会抛出异常。

这是一个简单而又经典的陷阱,在哈希表的MSDN线程安全块有一段说明:

Enumerating through a collection is intrinsically not a thread safe procedure. Even when a collection is synchronized, other threads can still modify the collection, which causes the enumerator to throw an exception. To guarantee thread safety during enumeration, you can either lock the collection during the entire enumeration or catch the exceptions resulting from changes made by other threads.

2、通过索引取集合中的数据


列表通过索引取值,一个简单的示例代码如下:


代码如下:

static int GetFirstOrDefault(ThreadSafeList<int> list)
        {
            if (list.Count > 0)
            {
                return list[0];
            }
            return 0;
        }

当列表中的元素为1个的时候,上面的操作非常容易进入一个无厘头的陷阱之中。有人会问怎么会有陷阱,你看取数据之前都判断了啊,逻辑正确了啊,这哪里还有问题吗?

按照类似于1中的分析,GetFirstOrDefault应该可以分为下面两步:

(1)线程1取数据,判断list.Count的时候发现列表内有1个元素,这一步线程安全,没有任何问题,然后准备返回索引为0的元素;

(2)线程2在线程1将要取索引为0的元素之前移除了列表中的唯一元素或者直接将list指向null,这样线程1通过索引取元素就抛出异常了。

3、如何保证容器数据操作安全?


从上面的两个示例,我们得知通常所看到的线程安全实际上并不一定安全。不安全的主要原因就是容器内的数据很容易被其他线程改变,或者可以简要概括为:一段时间差引发的血案。实际上,我们平时所做的业务系统,归根结底很多bug或者隐藏的缺陷都是由不起眼的一小段时间差引起的。

保证容器内的数据和操作都安全,一种简单而有效的方法就是将你所要进行的操作进行“事务”处理。比如示例1中哈希表的Value的遍历操作,通常情况下,我们分作两步:

(1)、(安全地)读取数据

(2)、(不安全地)遍历;

为了达到遍历操作不抛出异常,我们可以把两步合并为一步,抽象出一个线程安全的新方法TryGetAndEnumerate,这样可以保证线程安全地取数据和遍历,具体实现无非是lock一下SyncRoot类似的这种思路。但是这种线程安全的遍历可能代价很高,而且极其不通用。

线程安全集合容易产生的问题和解决方法,请参考JaredPar MSFT的Why are thread safe collections so hard?,这篇文章对设计一个线程安全的容器的指导原则是:

1、Don't add an decision procedures(procedures like Count as decision procedures).  They lead users down the path to bad code.
2、Methods which query the object can always fail and the API should reflect this.

实际上大家都知道利用事务处理思想多用TryXXX方法一般是没错的。

(0)

相关推荐

  • C#键值对容器的介绍

    StringDictionary:默认key不区分大小写 NameValueCollection:默认key区分大小写 KeyedCollection:不是键值对容器,但是比键值对容器更好用,强烈推荐 命名空间using System.Collections.Specialized System.Collections 命名空间包含接口和类,这些接口和类定义各种对象(如列表.队列.位数组.哈希表和字典)的集合. System.Collections.Generic 命名空间包含定义泛型集合的接口

  • PHP解耦的三重境界(浅谈服务容器)

    阅读本文之前你需要掌握:PHP语法,面向对象 在完成整个软件项目开发的过程中,有时需要多人合作,有时也可以自己独立完成,不管是哪一种,随着代码量上升,写着写着就"失控"了,渐渐"丑陋接口,肮脏实现",项目维护成本和难度上升,到了难以维持的程度,只有重构或者重新开发. 第一重境界 假设场景:我们需要写一个处理类,能够同时操作会话,数据库和文件系统.我们或许会这么写. 境界特征:可以运行,但是严重耦合 class DB{ public function DB($arg1

  • 关于STL中的map容器的一些总结

    一.关于map的介绍 map是STL的一个容器,和set一样,map也是一种关联式容器.它提供一对一(其中第一个可以称为关键字,每个关键字只能在map中出现一次,第二个可能称为该关键字的值)的数据处理能力,由于这个特性,有助于我们处理一对一数据.这里说下map内部数据的组织,map内部是自建一颗红黑树(一种非严格意义上的平衡二叉树),这颗树具有对数据自动排序的功能,所以在map内部所有的数据都是有序的.学习map我们一定要理解什么是一对一的数据映射?比如:一个班级中,每个学生的学号跟他的姓名就存

  • Java容器类的深入理解

    Java容器类包含List.ArrayList.Vector及map.HashTable.HashMap ArrayList和HashMap是异步的,Vector和HashTable是同步的,所以Vector和HashTable是线程安全的,而ArrayList和HashMap并不是线程安全的.因为同步需要花费机器时间,所以Vector和HashTable的执行效率要低于ArrayList和HashMap.Collection├List       接口│├LinkedList       链表

  • 关于STL中set容器的一些总结

    1.关于set C++ STL 之所以得到广泛的赞誉,也被很多人使用,不只是提供了像vector, string, list等方便的容器,更重要的是STL封装了许多复杂的数据结构算法和大量常用数据结构操作.vector封装数组,list封装了链表,map和set封装了二叉树等,在封装这些数据结构的时候,STL按照程序员的使用习惯,以成员函数方式提供的常用操作,如:插入.排序.删除.查找等.让用户在STL使用过程中,并不会感到陌生. 关于set,必须说明的是set关联式容器.set作为一个容器也是

  • 剖析Go编写的Socket服务器模块解耦及基础模块的设计

    Server的解耦-通过Router+Controller实现逻辑分发 在实际的系统项目工程中中,我们在写代码的时候要尽量避免不必要的耦合,否则你以后在更新和维护代码的时候会发现如同深陷泥潭,随便改点东西整个系统都要变动的酸爽会让你深切后悔自己当初为什么非要把东西都写到一块去(我不会说我刚实习的时候就是这么干的...) 所以这一篇主要说说如何设计Sever的内部逻辑,将Server处理Client发送信息的这部分逻辑与Sevrer处理Socket连接的逻辑进行解耦- 这一块的实现灵感主要是在读一

  • C#实现根据指定容器和控件名字获得控件的方法

    本文所述为C#实现根据指定容器和控件名字获得控件的方法,在进行C#应用程序设计时有一定的借鉴价值.分享给大家供大家参考借鉴.具体实现方法如下: 功能代码如下: /// <summary> /// 根据指定容器和控件名字,获得控件 /// </summary> /// <param name="obj">容器</param> /// <param name="strControlName">控件名字</

  • C++中的哈希容器unordered_map使用示例

    随着C++0x标准的确立,C++的标准库中也终于有了hash table这个东西. 很久以来,STL中都只提供<map>作为存放对应关系的容器,内部通常用红黑树实现,据说原因是二叉平衡树(如红黑树)的各种操作,插入.删除.查找等,都是稳定的时间复杂度,即O(log n):但是对于hash表来说,由于无法避免re-hash所带来的性能问题,即使大多数情况下hash表的性能非常好,但是re-hash所带来的不稳定性在当时是不能容忍的. 不过由于hash表的性能优势,它的使用面还是很广的,于是第三方

  • 多浏览器支持CSS 容器内容超出(溢出)支持自动换行

    .linebr { clear: both; /* 清除左右浮动 */ width: 100px; /* 必须定义宽度 */ word-break: break-word; /* 文本行的任意字内断开 */ word-wrap: break-word; /* IE */ white-space: -moz-pre-wrap; /* Mozilla */ white-space: -hp-pre-wrap; /* HP printers */ white-space: -o-pre-wrap; /

  • Java Web项目前端规范(采用命名空间使js深度解耦合)

    没有规矩不成方圆,一个优秀的代码架构不仅易于开发和维护,而且是一门管理与执行的艺术. 这几年来经历了很多项目,对代码之间的强耦合及书写不规范,维护性差等问题深恶痛绝.在这里,通过仔细分析后,结合自己的编码习惯总结了一套适用于javaweb项目的前端书写规范,与大家分享一下. ps:感谢阿海的创意,后期整理如下(附文件下载): 一.项目结构 这里和其他项目区别不大,我将模板抽离出来,更容易分析和理解: 解释一下:js主要包括extends(引入第三方的js).module(项目模块自己的js).l

随机推荐