C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

1:RabbitMQ是个啥?(专业术语参考自网络)

 RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

  RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库

2:使用RabbitMQ有啥好处?

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。

对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。

RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,

3:RabbitMq的安装以及环境搭建等:

网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!

3.1:运行容器的命令如下:

docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management

4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?

4.1什么时候使用MQ?

1)数据驱动的任务依赖

2)上游不关心多下游执行结果

3)异步返回执行时间长

4.2什么时候不使用MQ?

需要实时关注执行结果 (eg:同步调用)

5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)

6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:

Code:

//简单生产端 ui调用者

using System;
namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;

  class Program
  {
    static void Main(string[] args)
    {
        //就是简单的队列,生产者
        Console.WriteLine("====RabbitMqPublishDemo====");
        for (int i = 0; i < 500; i++)
        {
          ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
        }
        Console.WriteLine("生成完毕!");
        Console.ReadLine();
    }
  }
}

/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{

  using (IConnection conn = connectionFactory.CreateConnection())
  {
    using (IModel channel = conn.CreateModel())
    {
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var msgBody = Encoding.UTF8.GetBytes(msg);
      channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
    }
  }
}

//简单消费端
using System;

namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;

  class Program
  {
    static void Main(string[] args)
    {
      Console.WriteLine("====RabbitMqConsumerDemo====");
      ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
      {
        Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
      });
      Console.ReadLine();
    }
  }
}

   #region 简单生产者后端逻辑
    /// <summary>
    /// 简单消费者
    /// </summary>
    /// <param name="queueName">队列名称</param>
    /// <param name="isBasicNack">失败后是否自动放到队列</param>
    /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
    public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
    {
      Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
      IConnection conn = connectionFactory.CreateConnection();
      IModel channel = conn.CreateModel();
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var consumer = new EventingBasicConsumer(channel);
      consumer.Received += (sender, ea) =>
      {
        byte[] bymsg = ea.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bymsg);
        if (handleMsgStr != null)
        {
          handleMsgStr.Invoke(msg);
        }
        else
        {
          Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
        }
      };
      channel.BasicConsume(queueName, autoAck: true, consumer);
    }
    #endregion

7:Work模式

//简单生产端 ui调用者

using System;
namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;

  class Program
  {
    static void Main(string[] args)
    {
        //就是简单的队列,生产者
        Console.WriteLine("====RabbitMqPublishDemo====");
        for (int i = 0; i < 500; i++)
        {
          ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
        }
        Console.WriteLine("生成完毕!");
        Console.ReadLine();
    }
  }
}

/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{

  using (IConnection conn = connectionFactory.CreateConnection())
  {
    using (IModel channel = conn.CreateModel())
    {
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var msgBody = Encoding.UTF8.GetBytes(msg);
      channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
    }
  }
}

//简单消费端
using System;

namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;

  class Program
  {
    static void Main(string[] args)
    {
      Console.WriteLine("====RabbitMqConsumerDemo====");
      ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
      {
        Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
      });
      Console.ReadLine();
    }
  }
}

   #region 简单生产者后端逻辑
    /// <summary>
    /// 简单消费者
    /// </summary>
    /// <param name="queueName">队列名称</param>
    /// <param name="isBasicNack">失败后是否自动放到队列</param>
    /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
    public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
    {
      Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
      IConnection conn = connectionFactory.CreateConnection();
      IModel channel = conn.CreateModel();
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var consumer = new EventingBasicConsumer(channel);
      consumer.Received += (sender, ea) =>
      {
        byte[] bymsg = ea.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bymsg);
        if (handleMsgStr != null)
        {
          handleMsgStr.Invoke(msg);
        }
        else
        {
          Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
        }
      };
      channel.BasicConsume(queueName, autoAck: true, consumer);
    }
    #endregion

8:Fanout

Code:

//就如下的code, 多次生产,3个消费者都可以自动开始消费

//生产者
using System;
namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;
  class Program
  {
    static void Main(string[] args)
    {
      for (int i = 0; i < 500; i++)
      {
        ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :发布消息成功{i}");
      }
      Console.WriteLine("工作队列模式 生成完毕......!");
      Console.ReadLine();
    }
  }
}

//生产者后端逻辑
public static void PublishWorkQueueModel(string queueName, string msg)
    {
      using (var connection = connectionFactory.CreateConnection())
      using (var channel = connection.CreateModel())
      {
        channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
        var body = Encoding.UTF8.GetBytes(msg);
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
        Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");
      }
    }

//work消费端
using System;

namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;
  class Program
  {
    static void Main(string[] args)
    {
      Console.WriteLine("====Work模式开启了====");
      ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>
      {
        Console.WriteLine($"work模式获取到消息{msg}");
      });
      Console.ReadLine();
    }
  }
}

//work后端逻辑
    public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)
    {
      var connection = connectionFactory.CreateConnection();
      var channel = connection.CreateModel();

      channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
      channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

      var consumer = new EventingBasicConsumer(channel);
      Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");

      consumer.Received += (sender, ea) =>
      {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        if (handserMsg != null)
        {
          if (!string.IsNullOrEmpty(message))
          {
            handserMsg.Invoke(message);
          }
        }
        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
      };
      channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }

9:Direct

Code:

//同一个消息会被多个订阅者消费

//发布者
using System;

namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;

  class Program
  {
    static void Main(string[] args)
    {

      #region 发布订阅模式,带上了exchange
      for (int i = 0; i < 500; i++)
      {
        ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"发布的消息是:{i}");
      }
      Console.WriteLine("发布ok!");
      #endregion
      Console.ReadLine();
    }
  }
}
//发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout
  public static void PublishExchangeModel(string exchangeName, string message)
    {
      using (var connection = connectionFactory.CreateConnection())
      using (var channel = connection.CreateModel())
      {
        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
        var body = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
        Console.WriteLine($" Sent {message}");
      }
    }

//订阅者
using System;
namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;
  class Program
  {
    static void Main(string[] args)
    {

      #region 发布订阅模式 Exchange
      ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>
      {
        Console.WriteLine($"订阅到消息:{msg}");
      });
      #endregion
      Console.ReadLine();
    }
  }
}

//订阅者后端的逻辑
 public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)
    {
      var connection = connectionFactory.CreateConnection();
      var channel = connection.CreateModel();

      channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉

      var queueName = channel.QueueDeclare().QueueName;
      channel.QueueBind(queue: queueName,
               exchange: exchangeName,
               routingKey: "");

      Console.WriteLine(" Waiting for msg....");

      var consumer = new EventingBasicConsumer(channel);
      consumer.Received += (model, ea) =>
      {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        if (handlerMsg != null)
        {
          if (!string.IsNullOrEmpty(message))
          {
            handlerMsg.Invoke(message);
          }
        }
        else
        {
          Console.WriteLine($"订阅到消息:{message}");
        }
      };
      channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    }

到此这篇关于C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)的文章就介绍到这了,更多相关C#使用RabbitMq队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • C#实现rabbitmq 延迟队列功能实例代码

    最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就

  • C#调用RabbitMQ实现消息队列的示例代码

    前言 我在刚接触使用中间件的时候,发现,中间件的使用并不是最难的,反而是中间件的下载,安装,配置才是最难的. 所以,这篇文章我们从头开始学习RabbitMq,真正的从头开始. 关于消息队列 其实消息队列没有那么神秘,我们这样想一下,用户访问网站,最终是要将数据以HTTP的协议的方式,通过网络传输到主机的某个端口上的. 那么,接收数据的方式是什么呢?自然是端口监听啦. 那消息队列是什么就很好解释了? 它就是端口监听,接到数据后,将数据排列起来. 那这件事,我们不用中间件能做吗? 当然能做啦,写个T

  • C#操作RabbitMQ的完整实例

    一.下载RabbitMQ http://www.rabbitmq.com/install-windows.html 二.下载OTP http://www.erlang.org/downloads 三.安装OTP.RabbitMQ 四.配置RabbitMQ 找到bat的目录 执行相关命令 1.添加用户密码 rabbitmqctl add_user wenli wenli 2.设置wenli为管理员rabbitmqctl set_user_tags wenli administrator 3.启动R

  • c# rabbitmq 简单收发消息的示例代码

    发布消息:(生产者) /// <summary> /// 发送消息 /// </summary> /// <param name="queue">队列名</param> /// <param name="message">消息内容</param> private static void PublishInfo(string queue, string message) { try { var f

  • C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

    1:RabbitMQ是个啥?(专业术语参考自网络) RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件). RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的.所有主要的编程语言均有与代理接口通讯的客户端库 2:使用RabbitMQ有啥好处? RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现. AMQP的主要特征是面向消息.队列.路由(包

  • golang gin 监听rabbitmq队列无限消费的案例代码

    golang gin 监听rabbitmq队列无限消费 连接rabbitmq package database import ( "github.com/streadway/amqp" "log" "reflect" "yy-data-processing/common/config" ) var RabbitConn *amqp.Connection var RabbitChannel *amqp.Channel func

  • python操作RabbitMq的三种工作模式

    一.简介: RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件.消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递于队列,由另一应用程序读取 完成通信.而作为中间件的 RabbitMq 无疑是目前最流行的消息队列之一. ​ RabbitMq 应用场景广泛: 系统的高可用:日常生活当中各种商城秒杀,高流量,高并发的场景.当服务器接收到如此大量请求处理业务时,有宕机的风险.某些业务可能极其复杂,但这部分不是高时效性,不需要立即反馈给用户,我们可以将这部

  • Spring学习笔记3之消息队列(rabbitmq)发送邮件功能

    rabbitmq简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBM WEBSPHERE MQ. 本节的内容是用户注册时,将邮

  • 利用Python学习RabbitMQ消息队列

    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. rabbitmq基本管理命令: 一步启动Erlang node和Rabbit应用:sudo rabbitmq-serv

  • python实现RabbitMQ的消息队列的示例代码

    最近在研究redis做消息队列时,顺便看了一下RabbitMQ做消息队列的实现.以下是总结的RabbitMQ中三种exchange模式的实现,分别是fanout, direct和topic. base.py: import pika # 获取认证对象,参数是用户名.密码.远程连接时需要认证 credentials = pika.PlainCredentials("admin", "admin") # BlockingConnection(): 实例化连接对象 # C

  • 实战干货之基于SpringBoot的RabbitMQ多种模式队列

    目录 环境准备 安装RabbitMQ 依赖 连接配置 五种队列模式实现 1 点对点的队列 2 工作队列模式Work Queue 3 路由模式Routing 4 发布/订阅模式Publish/Subscribe 5 通配符模式Topics 总结 环境准备 安装RabbitMQ 由于RabbitMQ的安装比较简单,这里不再赘述.可自行到官网下载http://www.rabbitmq.com/download.html 依赖 SpringBoot项目导入依赖 <dependency> <gro

  • SpringBoot整合RabbitMQ实现交换机与队列的绑定

    目录 简介 配置方法概述 法1:配置类(简洁方法)(推荐) 法2:配置类(繁琐方法)(不推荐) 法3:使用方配置(不推荐) 法4:MQ服务端网页(不推荐) 简介 本文用实例介绍SpringBoot中RabbitMQ如何绑定交换机(交换器)与队列. 配置方法概述 交换机 下边两种方式等价. ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME).durable(true).build(); new TopicExchange(EXCHANGE_T

  • 浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式;2.并行方式 a.串

  • rabbitmq学习系列教程之消息应答(autoAck)、队列持久化(durable)及消息持久化

    目录 一.前言 二.autoAck参数的讨论 三.rabbitmq队列持久化操作 四.2019.11.04问题补充 五.2019.11.07消息的持久化 六.2022.02.09增加队列持久化说明 结语 一.前言 Boolean autoAck = false; channel.basicConsume(queue_name, autoAck ,consumer); 在simple queue 和 work queue(轮询) 处理中,我们设置的消费者的消息监听都采用 channel.basic

随机推荐