C#实现万物皆可排序的队列方法详解

需求

产品中需要向不同的客户推送数据,原来的实现是每条数据产生后就立即向客户推送数据,走的的是HTTP协议。因为每条数据都比较小,而数据生成的频次也比较高,这就会频繁的建立HTTP连接,而且每次HTTP传输中携带的业务数据都很小,对网络的实际利用率不高。希望能够提高网络的利用率,并降低系统的负载。

分析

一个很自然的想法就是将多条数据一起发送,这里有几个关键点:

1、多条数据的聚合逻辑: 是攒够几条发送,还是按照时间周期发送。如果是攒够几条发送,在数据比较稀疏或者产生频率不那么稳定的时候,攒够需要的数据条数可能比较困难,这时候还得需要一个过期时间,因为客户可能接受不了太多的延迟。既然不管怎样都需要使用时间进行控制,我这里索性就选择按照时间周期发送了。思路是:自上次发送时间起,经过了某个时长之后,就发送客户在这段时间内产生的所有数据。

2、数据到期判断方法:既然选择了按照时间周期发送,那么就必须有办法判断是否到了发送时间。一个很简单的想法就是轮询,把所有客户轮询一遍,看看谁的数据到期了,就发送谁的。这个算法的时间复杂度是O(N),如果客户比较多,就会消耗过多的时间在这上边。还有一个办法:如果客户按照时间排序好了,那么只需要取时间最早的客户的数据时间判断就好了,满足就发送,一直向后找,直到获取的客户数据时间不符合条件,则退出处理,然后等一会再进行判断处理。这就需要有一个支持排序的数据结构,写入数据时自动排序,这种数据结构的时间复杂度一般可以做到O(log(n))。对于这个数据结构的读写操作原理上就是队列的操作方式,只不过是个可排序的队列。

3、区分客户:不同客户的数据接收地址不同,向具体某个客户发送数据时,应该能比较方便的聚合他的数据,最好是直接就能拿到需要发送的数据。可以使用字典数据结构来满足这个需求,取某个客户数据的时间复杂度可以降低到O(1)。

4、数据的安全性问题:如果程序在数据发送成功之前退出了,未发送的数据怎么办?是还能继续发送,还是就丢掉不管了。如果要在程序重启后恢复未发送成功的数据,则必须将数据同步到别的地方,比如持久化到磁盘。因为我这里的数据安全性要求不高,丢失一些数据也是允许的,所以要发送的数据收到之后放到内存就行了。

实现

上文提到可排序的数据结构,可以使用SortedList<TKey,TValue>,键是时间,值是这个时间产生了数据的客户标识列表。不过它的读写操作不是线程安全的,需要自己做同步,这里简单点就使用lock了。

对于不同客户的数据,为了方便获取,使用Dictionary<TKey,TValue>来满足,键是客户的标识,值是累积的未发送客户数据。这个数据读写也不是线程安全的,可以和SortedList的读写放到同一个lock中。

下边是它们的定义:

SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>();
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
readonly object _lock = new object();

插入数据的时候,需要先写入SortedList,然后再写入Dictionary。代码逻辑比较简单,请看:

    public void Publish(TKey key, TValue value)
    {
        DateTime now = DateTime.Now;
        lock (_lock)
        {
            if (_queue.TryGetValue(now, out List<TKey>? keys))
            {
                if (!keys!.Contains(key))
                {
                    keys.Add(key);
                }
            }
            else
            {
                _queue.Add(now, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                values.Add(value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }

对于消费数据,这里采用拉数据的模式。最开始写的方法逻辑是:读取一条数据,处理它,然后从队列中删除。但是这个逻辑需要对队列进行读写,所以必须加锁。一般处理数据比较耗时,比如这里要通过HTTP发送数据,加锁的话就可能导致写数据到队列时阻塞的时间比较长。所以这里实现的是把可以发送的数据全部提取出来,然后就释放锁,数据的处理放到锁的外部实现,这样队列的读写性能就比较好了。

    public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        DateTime now = DateTime.Now;

        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var first = _queue.First();
                var diffMillseconds = now.Subtract(first.Key).TotalMilliseconds;
                if (diffMillseconds < _valueDequeueMillseconds)
                {
                    break;
                }

                var keys = first.Value;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        result.Add((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }

这段代码比较长一些,我梳理下逻辑:取队列的第一条数据,判断时间是否达到发送周期,未达到则直接退出,方法返回空列表。如果达到发送周期,则取出第一条数据中存储的客户标识,然后根据这些标识获取对应的客户未发送数据,将这些数据按照客户维度添加到返回列表中,将这些客户及其数据从队列中移除,返回有数据的列表。这里还增加了一个拉取数据的条数限制,方便根据业务实际情况进行控制。

再来看一下怎么使用这个队列,这里模拟多个生产者加一个消费者,其实可以任意多个生产者和消费者:

TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000);

List<Task> publishTasks = new List<Task>();

for (int i = 0; i < 4; i++)
{
    var j = i;
    publishTasks.Add(Task.Factory.StartNew(() =>
    {
        int k = 0;
        while (true)
        {
            queue.Publish($"key_{k}", $"value_{j}_{k}");
            Thread.Sleep(15);
            k++;
        }
    }, TaskCreationOptions.LongRunning));
}

Task.Factory.StartNew(() =>
{
    while (true)
    {
        var list = queue.Pull(100);
        if (list.Count <= 0)
        {
            Thread.Sleep(100);
            continue;
        }

        foreach (var item in list)
        {
            Console.WriteLine($"{DateTime.Now.ToString("mmss.fff")}:{item.key}, {string.Join(",", item.value)}");
        }
    }

}, TaskCreationOptions.LongRunning);

Task.WaitAll(publishTasks.ToArray());

以上就是针对这个特定需求实现的一个按照时间进行排序的队列。

万物皆可排序的队列
我们很容易想到,既然可以按照时间排序,那么按照别的数据类型排序也是可以的。这个数据结构可以应用的场景很多,比如按照权重排序的队列、按照优先级排序的队列、按照年龄排序的队列、按照银行存款排序的队列,等等。这就是一个万物皆可排序的队列。

我这里把主要代码贴出来(完整代码和示例请看文末):

public class SortedQueue<TSortKey, TKey, TValue>
where TSortKey : notnull, IComparable
where TKey : notnull
where TValue : notnull
{
    Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();

    SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>();

    readonly object _lock = new object();

    /// <summary>
    /// Create a new instance of SortedQueue
    /// </summary>
    public SortedQueue(int maxNumberOfMessageConsumedOnce)
    {
    }

    /// <summary>
    /// Publish a message to queue
    /// </summary>
    /// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param>
    /// <param name="key">The message key.</param>
    /// <param name="value">The message value.</param>
    public void Publish(TSortKey sortKey, TKey key, TValue value)
    {
        lock (_lock)
        {
            if (_queue.TryGetValue(sortKey, out List<TKey>? keys))
            {
                keys.Add(key);
            }
            else
            {
                _queue.Add(sortKey, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                values.Add(value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }

    /// <summary>
    /// Pull a batch of messages.
    /// </summary>
    /// <param name="maxNumberOfMessages">The maximum number of pull messages.</param>
    /// <returns></returns>
    public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var keys = _queue.First().Value;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        result.Add((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }
}

代码逻辑还是比较简单的,就不罗嗦了,如有问题欢迎留言交流。

再说数据安全

因为在这个实现中所有待处理的数据都在内存中,丢失数据会带来一定的风险,因为我这个程序前边还有一个队列,即使程序崩溃了,也只损失没处理的一小部分数据,业务上可以接受,所以这样做没有问题。如果你对这个程序感兴趣,需要慎重考虑你的应用场景。

来看看数据丢失可能发生的两种情况:

一是数据还在队列中时程序重启了:对于这种情况,前文提到将数据同步到其它地方,比如写入Redis、写入数据库、写入磁盘等等。不过因为网络IO、磁盘IO较慢,这往往会带来吞吐量的大幅下降,想要保证一定的吞吐量,还得引入一些分片机制,又因为分布式的不可靠,可能还得增加一些容错容灾机制,比较复杂,可以参考Kafka。

二是数据处理的时候失败了:对于这种情况,可以让程序重试;但是如果异常导致程序崩溃了,数据已经从内存或者其它存储中移除了,数据还是会发生丢失。这时候可以采用一个ACK机制,处理成功后向队列发送一个ACK,携带已经处理的数据标识,队列根据标识删除数据。否则消费者还能消费到这些数据。

这些问题并不一定要完全解决,还是得看业务场景,有可能你把数据持久化到Redis就够了,或者你也不用引入ACK机制,记录下处理到哪一条了就行了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • C#实现泛型动态循环数组队列的方法

    任务 循环数组 实现目标:(1)创建一个新的数组数据结构; (2)该数据结构为泛型; (3)可以按照元素多少进行扩容缩容; (4)进行添加删除操作的时间复杂度小于O(n); 优势:在取出放入的操作中消耗的资源更少 劣势:取出特定元素或特定下标元素平均消耗的资源为普通数组平均消耗资源的最大值 循环数组队列 实现目标:(1)根据循环数组构建出循环的队列数据结构 优势:节省资源,运行速度快: 劣势:不能灵活取出 重点:如何实现循环的计算下标语句. 循环下标语句 完整代码: using System;

  • C#实现优先队列和堆排序

    目录 优先队列 1.API 2.初级实现 3.堆的定义 二叉堆表示法 4.堆的算法 上浮(由下至上的堆的有序化) 下沉(由上至下的堆的有序化) 改进 堆排序 1.堆的构造 2.下沉排序 先下沉后上浮 优先队列 许多应用程序都需要处理有序的元素,但不一定要求它们全部有序,或是不一定要一次就将它们排序.很多情况下是收集一些元素,处理当前键值最大的元素,然后再收集更多的元素,再处理当前键值最大的元素.这种情况下,需要的数据结构支持两种操作:删除最大的元素和插入元素.这种数据结构类型叫优先队列. 这里,

  • C#集合之队列的用法

    队列是其元素按照先进先出(FIFO)的方式来处理的集合.队列使用System.Collections.Generic名称空间中的泛型类Queue<T>实现.在内部,Queue<T>类使用T类型的数组,这类似List<T>(https://www.jb51.net/article/244084.htm)类型.队列实现ICollection和IEnumerable<T>接口,但没有实现ICollection<T>接口,所以ICollection<

  • C#多线程处理多个队列数据的方法

    本文实例讲述了C#多线程处理多个队列数据的方法.分享给大家供大家参考.具体实现方法如下: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Collections; using System.Windows.Forms; namespace ThredProcessQueue { //用于顯示狀態的代理

  • C#实现顺序队列和链队列的代码实例

    和上篇栈的实现基本是一个思路: 废话不多说,直接写代码吧 //自定义队列接口 namespace 队列 { interface IQueue<T> { int Count { get; } int GetLength(); bool IsEmpty(); void Clear(); void Enqueue(T item); T Dequeue(); T Peek(); } } //顺序队列的实现类 namespace 队列 { class SeqQueue<T> : IQueue

  • C#队列的简单使用

    队列的特性很简答,就是先进先出,一般利用数组来实现. 实现队列自然要实现几个函数:入队,出队,判断队满,判断队空,获得队头,队尾. 实现队列的关键在于队头指针和队尾指针的设置: 假设初始状态下,队头指针为0,队尾指针为-1,那么两个指针指的位置分别就是队头与队尾.当队尾的下一个是队头时队空,当队尾的下下个是队头时队满. 假设初始状态下,队头队尾指针都为0,那么队尾指针指的是队尾,而队头指针的下一个位置才是队头.当两指针相等时队空,当队尾指针的下一个是队头时队满. 如果我们画个图就会发现,不管怎么

  • C#中的队列Queue<T>与堆栈Stack<T>

    一.概述: Queue<T>队列,对象的先进先出集合(“FIFO”).Stack<T>栈,对象的后进先出集合(”LIFO”). Queue<T>.Stack<T>类似于List<T>,但 Queue<T>没有IList<T>,所以不能用索引访问队列.也没有实现ICollection<T>,无Add,Remove等方法. 二.操作 1.入队列:Enqueue() Queue<string> nums

  • C#实现万物皆可排序的队列方法详解

    需求 产品中需要向不同的客户推送数据,原来的实现是每条数据产生后就立即向客户推送数据,走的的是HTTP协议.因为每条数据都比较小,而数据生成的频次也比较高,这就会频繁的建立HTTP连接,而且每次HTTP传输中携带的业务数据都很小,对网络的实际利用率不高.希望能够提高网络的利用率,并降低系统的负载. 分析 一个很自然的想法就是将多条数据一起发送,这里有几个关键点: 1.多条数据的聚合逻辑: 是攒够几条发送,还是按照时间周期发送.如果是攒够几条发送,在数据比较稀疏或者产生频率不那么稳定的时候,攒够需

  • 对python实现合并两个排序链表的方法详解

    输入两个单调递增的链表,输出两个链表合成后的链表,当然我们需要合成后的链表满足单调不减规则. 1.迭代方法 def Merge(self, pHead1, pHead2): p1, p2 = pHead1, pHead2 if p1 and p2: if p1.val < p2.val: head = p1 p1 = p1.next else: head = p2 p2 = p2.next cur = head elif p1: return p1 else: return p2 while p

  • TypeScript合并两个排序链表的方法详解

    目录 前言 思路分析 实现代码 测试用例 示例代码 前言 给定两个递增排序的链表,如何将这两个链表合并?合并后的链表依然按照递增排序.本文就跟大家分享一种解决方案,欢迎各位感兴趣的开发者阅读本文. 思路分析 经过前面的学习,我们知道了有关链表的操作可以用指针来完成.同样的,这个问题也可以用双指针的思路来实现: p1指针指向链表1的头节点 p2指针指向链表2的头节点 声明一个变量存储合并后的链表,比对两个指针指向的节点值大小: 如果p1指针指向的节点值比p2指向的值小,合并后的链表节点就取p1节点

  • JavaScript数组的栈方法与队列方法详解

    数组(Array)和对象(Object)应该是JavaScript中使用最多也是最频繁的两种类型了,Array提供了很多常用的方法:栈方法.队列方法.重排序方法.操作方法.位置方法.迭代方法等等. 1.Array的栈方法 栈是一种LIFO(Last-In-First-Out,后进先出)的数据结构,也就是最新添加的项最早被移除.栈中项的插入(push)和移除,只发生在一个位置--栈的顶部.ECMAScript为数组提供了push()和pop()方法,可以实现类似栈的行为.下面两图分别演示了入栈与出

  • jquery.tableSort.js表格排序插件使用方法详解

    本文实例为大家分享了jquery.tableSort.js表格排序的具体代码,供大家参考,具体内容如下 1.一定要引jQuery包,所有jq插件都是基于jQuery包的 2.如果想指定哪一栏不排序这样写 $("#mytable").tablesorter({headers:{5:{sorter:false}}}); 第5列的sorter为false就OK了 参考:http://www.jb51.net/article/105217.htm <!DOCTYPE html> &

  • TableSort.js表格排序插件使用方法详解

    本文实例为大家分享了TableSort.js表格排序的具体代码,供大家参考,具体内容如下 <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>TableSort</title> <style type="text/css"> table { bor

  • jquery拖拽自动排序插件使用方法详解

    本文为大家分享了jquery拖拽自动排序插件,供大家参考,具体内容如下 该插件并不是原生js写的,是基于jquery的,想看原生的话,请绕道而行. html: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Document</title> <script type="text/javas

  • java 中 阻塞队列BlockingQueue详解及实例

    java 中 阻塞队列BlockingQueue详解及实例 BlockingQueue很好的解决了多线程中数据的传输,首先BlockingQueue是一个接口,它大致有四个实现类,这是一个很特殊的队列,如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒.同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作.

  • Java阻塞队列BlockingQueue详解

    目录 队列的类型 数据结构 阻塞队列 BlockingQueue 常见的阻塞队列 BlockingQueue API ArrayBlockingQueue 源码简解 生产者消费者模式 延迟队列 DelayQueue 队列的类型 无限队列(unbounded queue) 无容量限定,只随存储变化 有限队列(bounded queue) 定义了最大容量 向无限队列添加元素的所有操作都将永远不会阻塞(也是线程安全的),因此它可以增长到非常大的容量. 使用无限阻塞队列 BlockingQueue 设计

  • Android 中ViewPager重排序与更新实例详解

    Android 中ViewPager重排序与更新实例详解 最近的项目中有栏目订阅功能,在更改栏目顺序以后需要更新ViewPager.类似于网易新闻的频道管理. 在重新排序之后调用了PagerAdapter的notifyDataSetChanged方法,发现ViewPager并没有更新,于是我开始跟踪源码,在调用PagerAdapter的notifyDataSetChanged方法后,会触发Viewpager的dataSetChanged方法. void dataSetChanged() { //

随机推荐