.NetCore利用BlockingCollection实现简易消息队列

消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa。

我们用BlockingCollection来实现简单的消息队列。

BlockingCollection实现了生产者/消费者模式,是对IProducerConsumerCollection<T>接口的实现。与其他Concurrent集合一样,每次Add或Take元素,都会导致对集合的lock。只有当确定需要在内存中创建一个生产者,消费者模式时,再考虑这个类。

MSDN中的示例用法:

using (BlockingCollection<int> bc = new BlockingCollection<int>())
  {
    Task.Factory.StartNew(() =>
    {
      for (int i = 0; i < 1000; i++)
      {
        bc.Add(i);
        Thread.Sleep(50);
      }

      // Need to do this to keep foreach below from hanging
      bc.CompleteAdding();
    });

    // Now consume the blocking collection with foreach.
    // Use bc.GetConsumingEnumerable() instead of just bc because the
    // former will block waiting for completion and the latter will
    // simply take a snapshot of the current state of the underlying collection.
    foreach (var item in bc.GetConsumingEnumerable())
    {
      Console.WriteLine(item);
    }
  }

实现消息队列

用Vs2017创建一个控制台应用程序。创建DemoQueueBlock类,封装一些常用判断。

  • HasEle,判断是否有元素
  • Add向队列中添加元素
  • Take从队列中取出元素

为了不把BlockingCollection直接暴漏给使用者,我们封装一个DemoQueueBlock类

  /// <summary>
  /// BlockingCollection演示消息队列
  /// </summary>
  /// <typeparam name="T"></typeparam>
  public class DemoQueueBlock<T> where T : class
  {
    private static BlockingCollection<T> Colls;
    public DemoQueueBlock()
    {

    }
    public static bool IsComleted() {
      if (Colls != null && Colls.IsCompleted) {
        return true;
      }
      return false;
    }
    public static bool HasEle()
    {
      if (Colls != null && Colls.Count>0)
      {
        return true;
      }
      return false;
    }

    public static bool Add(T msg)
    {
      if (Colls == null)
      {
        Colls = new BlockingCollection<T>();
      }
      Colls.Add(msg);
      return true;
    }
    public static T Take()
    {
      if (Colls == null)
      {
        Colls = new BlockingCollection<T>();
      }
      return Colls.Take();
    }
  }

  /// <summary>
  /// 消息体
  /// </summary>
  public class DemoMessage
  {
    public string BusinessType { get; set; }
    public string BusinessId { get; set; }
    public string Body { get; set; }
  }

添加元素进队列

通过控制台,添加元素

      //添加元素
      while (true)
      {
        Console.WriteLine("请输入队列");
        var read = Console.ReadLine();
        if (read == "exit")
        {
          return;
        }

        DemoQueueBlock<DemoMessage>.Add(new DemoMessage() { BusinessId = read });
      }

消费队列

通过判断IsComleted,来确定是否获取队列

 Task.Factory.StartNew(() =>
      {
        //从队列中取元素。
        while (!DemoQueueBlock<DemoMessage>.IsComleted())
        {
          try
          {
            var m = DemoQueueBlock<DemoMessage>.Take();
           Console.WriteLine("已消费:" + m.BusinessId);
          }
          catch (Exception ex)
          {
            Console.WriteLine(ex.Message);
          }
        }
      });

查看运行结果

运行结果

这样我们就实现了简易的消息队列。

示例源码:简易队列

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • MongoDB固定集合(capped collection)的知识小结

    一 . 什么是固定集合 MongoDB中有一种特殊类型的集合,值得我们特别留意,那就是固定集合(capped collection). 固定集合可以声明collection的容量大小,其行为类似于循环队列.数据插入时,新文档会被插入到队列的末尾,如果队列已经被占满,那么最老的文档会被之后插入的文档覆盖. 固定集合特性:固定集合很像环形队列,如果空间不足,最早的文档就会被删除,为新的文档腾出空间.一般来说,固定集合适用于任何想要自动淘汰过期属性的场景. 固定集合应用场景 比如日志文件,聊天记录,通

  • ios基于UICollectionView实现横向瀑布流

    在网上找了许久,一直没有发现有提供横向瀑布流效果的.在项目中用到了我就在垂直瀑布流的基础上,进行了修改,做出了横向瀑布流的效果.同时也对一些UICollectionView的属性进行简单的注释,方便以后查阅. 1.首先要写一个继承与NSObject的布局类,记录每一行(列)目前的宽度(高度).再添加一个新的cell的时候进行判断比较,添加到最短的那一行或一列上. 2.横向的布局类入下,垂直的话就是讲对应的X Y轴数据进行调整即可. WaterfallFlowLayout为布局类,继承与NSObj

  • collection集合体系与并发修改异常的解决方法

    collection是单列集合的顶层接口,下面还包括了两个常用子接口  List.set List: list接口有两个实现的子类:特点是:有序且可重复 ArrayList的数据结构是数组结构 LinkedList的数据结构是链表结构 1.ArrayList:特点:查询快 增删慢  初始容量大小为10 扩充容量算法为    ((旧容量 * 3) / 2) + 1 如果你知道你的arrayList 会达到多少容量,可以在初始化的时候就指定,能节省扩容的性能开支 2.LinkedList:特点: 

  • iOS Swift利用UICollectionView实现无限轮播功能(原理)详解

    前言 作为一个资深(自认为)iOS程序猿,会经常用到轮播图,上一次使用UIScrollView实现无限轮播的效果,这一次在Swift语言中,我使用UICollectionView再为大家讲解一次无限轮播的实现原理. 先上图: UICollectionView-无限轮播.gif 首先需要实现了就是UICollectionView的分页,这个很简单: collectionView.isPagingEnabled = true 接下来就是原理,在UICollectionView的两端需要先添加两张图片

  • iOS开发UICollectionView实现拖拽效果

    一.介绍 iOS9提供API实现单元格排序功能,使用UICollectionView及其代理方法.iOS9之后有自带方法可以实现该效果,只需添加长按手势,实现手势方法和调用iOS9的API交换数据,iOS9之前需要自己写方法实现这效果,除了要添加长按手势,这里还需要利用截图替换原理,手动计算移动位置来处理视图交换和数据交换. 二.方法和步骤 1.创建工程项目和视图控制器,如下图 2.声明对象和设置代理和数据源代理 @interface ViewController ()<UICollection

  • JAVA集合框架工具类自定义Collections集合方法

    项目中有需要多次统计 某些集合中 的某个属性值,所以考虑封装一个方法,让其其定义实现计算方式. 话不多说,看代码: 1.封装的自定义集合工具类:CollectionsCustom package com.test.util; import java.util.Collection; import org.apache.commons.collections.CollectionUtils; /** * 自定义集合处理类 */ public class CollectionsCustom { /*

  • Python中collections模块的基本使用教程

    前言 之前认识了python基本的数据类型和数据结构,现在认识一个高级的:Collections,一个模块主要用来干嘛,有哪些类可以使用,看__init__.py就知道 '''This module implements specialized container datatypes providing alternatives to Python's general purpose built-in containers, dict, list, set, and tuple. * named

  • Java中Collection、List、Set、Map之间的关系总结

    初学java,单个的接触有点迷糊,所以总结下他们的关系 一.关系 Collection --List:以特定顺序存储 --ArrayList.LinkList.Vector --Set:不能包含重复的元素 --HashSet.TreeSet Map --HashMap.HashTable.TreeMap 二.分别讲解 Collection:Collection是一个父接口,List和Set是继承自他的子接口,Collection是最基本的集合接口,Java SDK中不提供直接继承自Collect

  • python的常用模块之collections模块详解

    认识模块 什么是模块? 常见的场景:一个模块就是一个包含了python定义和声明的文件,文件名就是模块名字加上.py的后缀. 但其实import加载的模块分为四个通用类别:   1 使用python编写的代码(.py文件)   2 已被编译为共享库或DLL的C或C++扩展   3 包好一组模块的包   4 使用C编写并链接到python解释器的内置模块 为何要使用模块? 如果你退出python解释器然后重新进入,那么你之前定义的函数或者变量都将丢失,因此我们通常将程序写到文件中以便永久保存下来,

  • Mybatis中collection和association的使用区别详解

    最近一直把collection和association弄混,所以为了增强自己的记忆,就撸一个关系出来算是总结罢了 1. 关联-association 2. 集合-collection 比如同时有User.java和Card.java两个类 User.java如下: public class User{ private Card card_one; private List<Card> card_many; } 在映射card_one属性时用association标签, 映射card_many时

随机推荐