如何用.NETCore操作RabbitMQ

什么是RabbitMQ?

RabbitMQ是由erlang语言开发的一个基于AMQP(Advanced Message Queuing Protocol)协议的企业级消息队列中间件。可实现队列,订阅/发布,路由,通配符等工作模式。

为什么要使用RabbitMQ?

  • 异步处理:比如发送邮件,发送短信等不需要等待处理结果的操作
  • 应用解耦:比如下单成功后,通知仓库发货,不需要等待仓库回应,通过消息队列去通知仓库,降低应用间耦合程序,可并行开发两个功能模块
  • 流量削锋:在抢购或者其他的活动页,服务处于爆发式请求状态,如果直连数据库,数据库容易被拖垮。抢购商品也容易出现库存超卖的情况。通过队列可有效解决该问题。
  • 日志处理:在单机中,日志直接写入到文件目录中,但是在分布式应用中,日志需要有统一的处理机制,可通过消息队列统一由某个消费端做处理。
  • 消息通信:如生产端和消费端可通过队列进行异步通信

如何安装RabbitMQ?

Windows端

1.安装erlang语言运行环境
https://erlang.org/download/otp_win64_23.2.exe
下载后直接下一步即可

2.安装RabbitMQ
https://www.rabbitmq.com/install-windows.html
直接点击安装下一步即可按章

3.安装RabbitMQ的Web管理平台

RabbitMQ的管理平台是通过插件的形式使用,需要手动启用管理平台
在Windows下,RabbitMQ默认被安装到C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.14 下。
打开sbin ,在cmd或者powershell中执行

rabbitmq-plugins.bat enable rabbitmq_management

安装完成后,浏览器打开 http://localhost:15672/ 即可看到RabbitMQ的管理界面。输入默认账号密码 guest 成功登录。

Linux环境安装

1.Ubuntu:https://www.rabbitmq.com/install-debian.html

2.Centos:https://www.rabbitmq.com/install-rpm.html

RabbitMQ的基本概念

生产者

发送消息的端

消费者

获取消息并处理的端

Connection

一个终端连接。每一个Connection都可以在RabbitMQ后台看到

Channel

Channel是建立在Connection上的一个虚拟通信管道。一般情况下,往消息队列中写入多条消息,为了不每条消息都建立一个TCP连接,所以RabbitMQ的做法是多条消息可以公用一个Connection,大大提高MQ的负载能力。

Exchange

Exchange是一个虚拟交换机。每一条消息都必须要通过交换机才能能进入对应的队列,可以理解为网络设备中的交换机,是一个意思。

Queue

Queue是一个存储消息的内部对象,所有的Rabbit MQ消息都存储在Queue中。生产者所生产的消息会存储在Queue中,消费者获取的消息也是从Queue中获取。

如何在.NET Core中使用RabbitMQ?

nuget安装

dotnet add package RabbitMQ.Client

创建生产者

const string QUEUENAME = "HELLO_MQ";
//创建连接对象工厂
var factory = new ConnectionFactory()
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost",
    Port = 5672,  //RabbitMQ默认的端口
};

while (true)
{
    using var conn = factory.CreateConnection();
    var chanel = conn.CreateModel();

    chanel.QueueDeclare(QUEUENAME, true, false, false);
    Console.WriteLine("输入生产内容:");
    var input = Console.ReadLine();
    chanel.BasicPublish("", QUEUENAME, null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
}

在循环中,输入一个值,按下enter,即可推送一条消息到队列。

也可以直接在RabbitMQ的管理后台查看

可以看到我们发送的消息已经被RabbitMQ存储在Queue中了。只等某个幸运的消费者前来消费。

创建消费者

const string QUEUENAME = "HELLO_MQ";
var factory = new ConnectionFactory()
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost",
    Port = 5672,
};

var conn = factory.CreateConnection();
var chanel = conn.CreateModel();
chanel.QueueDeclare(QUEUENAME, true, false, false);
EventingBasicConsumer consumer = new EventingBasicConsumer(chanel);
consumer.Received += (a, e) =>
{
    Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
    chanel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
};
chanel.BasicConsume(QUEUENAME, false, consumer);

Console.WriteLine("启动成功");
Console.ReadLine();

启动成功后,consumer的Received方法,会收到一条来自MQ的消息,

如果处理完成后,不调用chennel的BasicAck方法,那么这条消息依然会存在,下次有消费者出现,会再次推送给消费者。

简单的RabbitMQ Hello World到这里就算完成了。接下来就是稍微高级一点的应用

RabbitMQ的工作模式

Work Queue 工作队列模式

工作队列模式的意思就是一个生产者对应多个消费者。RabbitMQ会使用轮询去给每个消费者发送消息。

publish/subscribe

发布订阅模式是属于比较用多的一种。

发布订阅,是由交换机发布消息给多个队列。多个队列再对应多个消费者。

发布订阅模式对应的交换机类型的fanout。

消费者

A

const string QUEUENAME = "HELLO_MQ_B";
const string TESTEXCHANGE = "TESTEXCHANGE";
var factory = new ConnectionFactory()
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost",
    Port = 5672,
};

var conn = factory.CreateConnection();
var channel = conn.CreateModel();
//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (a, e) =>
{
    Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
    channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
};
channel.BasicConsume(QUEUENAME, false, consumer);

Console.WriteLine("启动成功");
Console.ReadLine();

B

const string QUEUENAME = "HELLO_MQ";
const string TESTEXCHANGE = "TESTEXCHANGE";
var factory = new ConnectionFactory()
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost",
    Port = 5672,
};

var conn = factory.CreateConnection();
var channel = conn.CreateModel();
//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (a, e) =>
{
    Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
    channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
};
channel.BasicConsume(QUEUENAME, false, consumer);

Console.WriteLine("启动成功");
Console.ReadLine();

生产者

const string QUEUENAME = "HELLO_MQ";
const string QUEUENAME_B = "HELLO_MQ_B";
const string TESTEXCHANGE = "TESTEXCHANGE";

//创建连接对象工厂
var factory = new ConnectionFactory()
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost",
    Port = 5672,  //RabbitMQ默认的端口
};
using var conn = factory.CreateConnection();
while (true)
{

    var channel = conn.CreateModel();
    //定义交换机
    channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
    Console.WriteLine("输入生产内容:");
    var input = Console.ReadLine();
    channel.BasicPublish(TESTEXCHANGE,"", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
}

在生产者运行成功后,RabbitMQ后台会出现一个交换机,点击交换机会看到交换机下绑定了两个队列

从生产者发送消息到队列,两个消费者会同时收到消息

routing模式

routing模式对应的交换机类型是direct,和发布订阅模式的区别在于:routing模式下,可以指定一个routingkey,用于区分消息

生产者

var channel = conn.CreateModel();
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
//绑定队列到交换机
Console.WriteLine("输入生产内容:");
var input = Console.ReadLine();
channel.BasicPublish(TESTEXCHANGE, "INFO", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));

消费者 A

//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "INFO");

消费者 B

//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "ERROR");

绑定成功后,发送消息,消费者A可以收到消息,消费者B无法收到消息。

如果遇到指定routingKey生产一条消息,结果 AB消费者都收到的情况。建议在RabbitMQ后台的交换机下看一下绑定的Queue是否重复绑定了多个routingKey.

topic通配符模式

在通配符模式下,RabbitMQ使用模糊匹配来决定把消息推送给哪个生产者。通配符有两个符号来匹配routingKey

1.*匹配一个字符 如:*.qq.com 可匹配 1.qq.com

2.#匹配一个或者多个字符。 如:*.qq.com 可匹配 1.qq.com或者1111.qq.com

其他的操作基本和routing模式一样。

header模式

header模式是把routingkey放到header中.取消掉了routingKey。并使用一个字典传递 K、V的方式来匹配。
比如同时要给用户发送邮件和短信,可直接通过header的键值对来匹配绑定的值,把消息传递给发短信和邮件的生产者.

以上就是如何用.NETCore操作RabbitMQ的详细内容,更多关于.NETCore 操作RabbitMQ的资料请关注我们其它相关文章!

(0)

相关推荐

  • Django日志及中间件模块应用案例

    基于邮件通知的服务监控和告警系统 主要功能点: 配置专用日志格式记录耗时 日志格式: 'simple':{ 'format':'%(asctimme)s %(message)s' } 处理器 'statistics_handler':{ 'level':'DEBUG', 'class':'logging.handlers.RotatingFileHandler', 'filename':os.path.join(LOG_DIR,'backend.log') 'maxBytes':'1024*10

  • NetCore 3.0文件上传和大文件上传的限制详解

    NetCore文件上传两种方式 NetCore官方给出的两种文件上传方式分别为"缓冲"."流式".我简单的说说两种的区别, 1.缓冲:通过模型绑定先把整个文件保存到内存,然后我们通过IFormFile得到stream,优点是效率高,缺点对内存要求大.文件不宜过大. 2.流式处理:直接读取请求体装载后的Section 对应的stream 直接操作strem即可.无需把整个请求体读入内存, 以下为官方微软说法 缓冲 整个文件读入 IFormFile,它是文件的 C# 表

  • 快速了解如何在.NETCORE中使用Generic-Host建立主机

    .NETCORE 中的 Generic Host 本文以自己在工作中学习和使用.net core generic-host 作一个总结. 前言 在创建的ASPNETCORE项目中,我们可以在Main()中看见,我们通过IWebHostBuild创建了一个IWebHost,而微软提供了WebHost.CreateDefaultBuilder(args)来帮助我们更轻松得创建WebHost. 常常我们的需求不需要创建Web项目,比如后台任务,那么我们如何像使用AspNetCore一样创建控制台项目.

  • 在 asp.net core 的中间件中返回具体的页面的实现方法

    前言 在 asp.net core 中,存在着中间件这一概念,在中间件中,我们可以比过滤器更早的介入到 http 请求管道,从而实现对每一次的 http 请求.响应做切面处理,从而实现一些特殊的功能 在使用中间件时,我们经常实现的是鉴权.请求日志记录.全局异常处理等等这种非业务性的需求,而如果你有在 asp.net core 中使用过 swashbuckle(swagger).health check.mini profiler 等等这样的组件的话,你会发现,这些第三方的组件往往都提供了页面,允

  • aspnetcore 实现简单的伪静态化功能

    Intro 在我的活动室预约项目中,有一个公告模块,类似于新闻发布,个人感觉像新闻这种网页基本就是发布的时候编辑一次之后就再也不会改了,最适合静态化了, 静态化之后用户请求的就是静态文件基本不再需要服务器端查询数据库甚至服务器端渲染,可以一定程度上提升服务器的处理能力以及优化用户体验,而且这种静态化的url对 SEO 比较友好. 由于我的这个项目正在开发中,迁移起来不太方便,所以使用的是伪静态化,看上去是访问的 *.html,实际上并不是 html,而是需要服务器处理的. GetStarted

  • .NetCore之接口缓存的实现示例

    1.问题:我们平时做开发的时候肯定都有用到缓存这个功能,一般写法是在需要的业务代码里读取缓存.判断是否存在.不存在则读取数据库再设置缓存这样一个步骤.但是如果我们有很多地方业务都有用到缓存,我们就需要在每个地方都写关于缓存的代码,这样会造成很多重复代码,同时对业务侵入不利于后续的开发维护. 2.一般的解决办法是将缓存的功能提取出来,然后在需要用到缓存的地方调用即可.这样确实减少了很多重复代码,但这样还是会存在整个项目通用的缓存功能侵入业务代码,那我们有什么办法将缓存功能完全提取出来,达到业务代码

  • .NETCore添加区域Area代码实例解析

    一,如下图 二,构建好框架,将不必要的包删掉 三,添加路由 app.UseEndpoints(endpoints => { endpoints.MapControllerRoute( name: "default", pattern: "{controller=Home}/{action=Index}/{id?}"); endpoints.MapAreaControllerRoute( name: "areas", "areas&

  • Vue Router中应用中间件的方法

    中间件是我们在软件开发中的一个古老而强大的概念,当我们在应用程序中使用路由相关模式时,它非常有用. 如果您不太了解中间件的含义,Nodejs框架Express里的中间件可以帮助您了解它们的工作原理. 但是,中间件仅适用于后端吗? 不,当应用程序中有路由时,中间件在前端或后端中就会非常常见.比如现在流行的单页应用程序. 有一些示例可以说明,何时可以使用中间件: 不允许未登录用户访问您的网页. 仅允许某些类型的用户查看页面(角色:管理员,作者等) 数据采集. 重置设置或清理存储空间. 限制访问用户的

  • .NETCore Docker实现容器化与私有镜像仓库管理

    一.Docker介绍 Docker是用Go语言编写基于Linux操作系统的一些特性开发的,其提供了操作系统级别的抽象,是一种容器管理技术,它隔离了应用程序对基础架构(操作系统等)的依赖.相较于虚拟机而言,Docker共享的是宿主机的硬件资源,使用容器来提供独立的运行环境来运行应用.虚拟机则是基于Supervisor(虚拟机管理程序)使用虚拟化技术来提供隔离的虚拟机,在虚拟机的操作系统上提供运行环境!虽然两者都提供了很好的资源隔离,但很明显Docker的虚拟化开销更低! Docker涉及了三个核心

  • 如何用.NETCore操作RabbitMQ

    什么是RabbitMQ? RabbitMQ是由erlang语言开发的一个基于AMQP(Advanced Message Queuing Protocol)协议的企业级消息队列中间件.可实现队列,订阅/发布,路由,通配符等工作模式. 为什么要使用RabbitMQ? 异步处理:比如发送邮件,发送短信等不需要等待处理结果的操作 应用解耦:比如下单成功后,通知仓库发货,不需要等待仓库回应,通过消息队列去通知仓库,降低应用间耦合程序,可并行开发两个功能模块 流量削锋:在抢购或者其他的活动页,服务处于爆发式

  • Python操作RabbitMQ服务器实现消息队列的路由功能

    Python使用Pika库(安装:sudo pip install pika)可以操作RabbitMQ消息队列服务器(安装:sudo apt-get install rabbitmq-server),这里我们来看一下MQ相关的路由功能. 路由键的实现 比如有一个需要给所有接收端发送消息的场景,但是如果需要自由定制,有的消息发给其中一些接收端,有些消息发送给另外一些接收端,要怎么办呢?这种情况下就要用到路由键了. 路由键的工作原理:每个接收端的消息队列在绑定交换机的时候,可以设定相应的路由键.发送

  • C#操作RabbitMQ的完整实例

    一.下载RabbitMQ http://www.rabbitmq.com/install-windows.html 二.下载OTP http://www.erlang.org/downloads 三.安装OTP.RabbitMQ 四.配置RabbitMQ 找到bat的目录 执行相关命令 1.添加用户密码 rabbitmqctl add_user wenli wenli 2.设置wenli为管理员rabbitmqctl set_user_tags wenli administrator 3.启动R

  • 教你如何用python操作摄像头以及对视频流的处理

    实验介绍 此次实验帮助大家利用 OpenCV 去读取摄像头的视频流,你可以直接使用笔记本本身的摄像头,也可以用 USB 连接直接的摄像头.如果你在操作过程中,摄像头读取失败, 实验中还为你提供了几个问题排查步骤.当然,对视频进行操作时还需要讲解视频相关的编解码格式以及特定帧的读取.在实验的最后,还提供了 OpenCV 的项目实战:视频录制与视频读取. 知识点 视频录制 视频编解码格式 视频读取以及特定帧的读取 视频录制 使用 OpenCV 录制视频,主要涉及 OpenCV 的 VideoWrit

  • 如何用python 操作zookeeper

    ZooKeeper 简介 ZooKeeper 是一个分布式的.开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 Hbase 的重要组件.它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护.域名服务.分布式同步.组服务等.ZooKeeper 支持大部分开发语言,除了某些特定的功能只支持 Java 和 C.python 通过 kazoo 可以实现操作 ZooKeeper . 一.安装 这个简单,使用 pip 命令安装 pip3

  • 如何用Navicat操作MySQL

    前言: 日常使用 MySQL 的过程中,我们可能会经常使用可视化工具来连接 MySQL ,其中比较常用的就是 Navicat 了.平时也会遇到某些同学问, Navicat 怎么安装,如何使用等问题.本篇文章笔者将结合个人经验,分享下 Navicat 操作 MySQL 简易教程. 1.Navicat简介 Navicat 是一款成熟可靠的数据库管理工具,深受技术人员的喜爱.它是以直觉化的图形用户界面而建的,让你可以以安全并且简单的方式创建.组织.访问并共用信息. Navicat 可以用来对本机或远程

  • php如何用PDO操作大数据对象

    目录 什么是大数据对象 "大"通常意味着"大约 4kb 或以上",尽管某些数据库在数据达到"大"之前可以轻松地处理多达 32kb 的数据.大对象本质上可能是文本或二进制形式的,我们在 PDOStatement::bindParam() 或 PDOStatement::bindColumn() 调用中使用 PDO::PARAM_LOB 类型码可以让 PDO 使用大数据类型.PDO::PARAM_LOB 告诉 PDO 作为流来映射数据,以便能使用 P

  • 如何用python 操作MongoDB数据库

    目录 一.前言 二.操作 MongoDB 1.安装 pymongo 2.连接 MongoDB 3.选择数据库 4.选择集合 5.插入数据 6.查询 7.更新数据 8.删除 一.前言 MongoDB属于 NoSQL(非关系型数据库),是一个基于分布式文件存储的开源数据库系统. 二.操作 MongoDB 1.安装 pymongo python 使用第三方库来连接操作 MongoDB,所以我们首先安装此库. pip3 install pymongodb 2.连接 MongoDB 使用 MongoCli

  • python操作RabbitMq的三种工作模式

    一.简介: RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件.消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递于队列,由另一应用程序读取 完成通信.而作为中间件的 RabbitMq 无疑是目前最流行的消息队列之一. ​ RabbitMq 应用场景广泛: 系统的高可用:日常生活当中各种商城秒杀,高流量,高并发的场景.当服务器接收到如此大量请求处理业务时,有宕机的风险.某些业务可能极其复杂,但这部分不是高时效性,不需要立即反馈给用户,我们可以将这部

  • 详解Python操作RabbitMQ服务器消息队列的远程结果返回

    先说一下笔者这里的测试环境:Ubuntu14.04 + Python 2.7.4 RabbitMQ服务器 sudo apt-get install rabbitmq-server Python使用RabbitMQ需要Pika库 sudo pip install pika 远程结果返回 消息发送端发送消息出去后没有结果返回.如果只是单纯发送消息,当然没有问题了,但是在实际中,常常会需要接收端将收到的消息进行处理之后,返回给发送端. 处理方法描述:发送端在发送信息前,产生一个接收消息的临时队列,该队

随机推荐