运用.net core中实例讲解RabbitMQ

目录
  • 一、RabbitMQ简介
    • (1) AMQP协议
    • (2)AMQP专业术语
    • (3)RabbitMQ整体架构
  • 二、安装RabbitMQ
  • 三、RabbitMQ六种队列模式在.NetCore中使用
    • (1)简单队列
    • (2)工作队列模式
    • (3)发布订阅模式
    • (4)路由模式(推荐使用)
    • (5)主题模式
    • (6)RPC模式
  • 总结

一、RabbitMQ简介

是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang(高并发语言)语言来编写的,并且RabbitMQ是基于AMQP协议的。

(1) AMQP协议

Advanced Message Queuing Protocol(高级消息队列协议)

(2)AMQP专业术语

(多路复用->在同一个线程中开启多个通道进行操作)

  • Server:又称broker,接受客户端的链接,实现AMQP实体服务
  • Connection:连接,应用程序与broker的网络连接
  • Channel:网络信道,几乎所有的操作都在channel中进行,Channel是进行消息读写的通道。客户端可以建立多个channel,每个channel代表一个会话任务。
  • Message:消息,服务器与应用程序之间传送的数据,由Properties和Body组成.Properties可以对消息进行修饰,必须消息的优先级、延迟等高级特性;Body则是消息体内容。
  • virtualhost: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个virtual host里面可以有若干个Exchange和Queue,同一个Virtual Host 里面不能有相同名称的Exchange 或 Queue。
  • Exchange:交换机,接收消息,根据路由键转单消息到绑定队列
  • Binding: Exchange和Queue之间的虚拟链接,binding中可以包换routing key
  • Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息。(如负载均衡)

(3)RabbitMQ整体架构

ClientA(生产者)发送消息到Exchange1(交换机),同时带上RouteKey(路由Key),Exchange1找到绑定交换机为它和绑定传入的RouteKey的队列,把消息转发到对应的队列,消费者Client1,Client2,Client3只需要指定对应的队列名即可以消费队列数据。

交换机和队列多对多关系,实际开发中一般是一个交换机对多个队列,防止设计复杂化。

二、安装RabbitMQ

安装方式不影响下面的使用,这里用Docker安装

#15672端口为web管理端的端口,5672为RabbitMQ服务的端口
docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management

输入:ip:5672访问验证。

建一个名为develop的Virtual host(虚拟主机)使用,项目中一般是一个项目建一个Virtual host用,能够隔离队列。

切换Virtual host

三、RabbitMQ六种队列模式在.NetCore中使用

(1)简单队列

最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式

描述:一个生产者 P 发送消息到队列 Q,一个消费者 C 接收

建一个RabbitMQHelper.cs类

/// <summary>
    /// RabbitMQ帮助类
    /// </summary>
    public class RabbitMQHelper
    {
        private static ConnectionFactory factory;
        private static object lockObj = new object();
        /// <summary>
        /// 获取单个RabbitMQ连接
        /// </summary>
        /// <returns></returns>
        public static IConnection GetConnection()
        {
            if (factory == null)
            {
                lock (lockObj)
                {
                    if (factory == null)
                    {
                         factory = new ConnectionFactory
                        {
                            HostName = "172.16.2.84",//ip
                            Port = 5672,//端口
                            UserName = "admin",//账号
                            Password = "123456",//密码
                            VirtualHost = "develop" //虚拟主机
                        };
                    }
                }
            }
            return factory.CreateConnection();
        }
    }

生产者代码

新建发送类Send.cs

public static void SimpleSendMsg()
        {
            string queueName = "simple_order";//队列名
            //创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {//创建队列
                    channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    for (var i = 0; i < 10; i++)
                    {
                        string message = $"Hello RabbitMQ MessageHello,{i + 1}";
                        var body = Encoding.UTF8.GetBytes(message);//发送消息
                        channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body);
                        Console.WriteLine($"发送消息到队列:{queueName},内容:{message}");
                    }
                }
            }
        }

创建队列参数解析:

durable:是否持久化。

exclusive:排他队列,只有创建它的连接(connection)能连,创建它的连接关闭,会自动删除队列。

autoDelete:被消费后,消费者数量都断开时自动删除队列。

arguments:创建队列的参数。

发送消息参数解析:

exchange:交换机,为什么能传空呢,因为RabbitMQ内置有一个默认的交换机,如果传空时,就会用默认交换机。

routingKey:路由名称,这里用队列名称做路由key。

mandatory:true告诉服务器至少将消息route到一个队列种,否则就将消息return给发送者;false:没有找到路由则消息丢弃。

执行效果:

队列产生10条消息。

消费者代码

新建Recevie.cs类

public static void SimpleConsumer()
        {
            string queueName = "simple_order";
            var connection = RabbitMQHelper.GetConnection();
            {
                //创建信道
                var channel = connection.CreateModel();
                {
                    //创建队列
                     channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    int i = 0;
                    consumer.Received += (model, ea) =>
                    {
                        //消费者业务处理
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},队列{queueName}消费消息长度:{message.Length}");
                        i++;
                    };
                    channel.BasicConsume(queueName, true, consumer);
                }
            }
        }

消费者只需要知道队列名就可以消费了,不需要Exchange和routingKey。

注:消费者这里有一个创建队列,它本身不需要,是预防消费端程序先执行,没有队列会报错。

执行效果:

消息已经被消费完。

(2)工作队列模式

一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式

生产者P发送消息到队列,多个消费者C消费队列的数据。

工作队列也称为公平性队列模式,循环分发,RabbitMQ将按顺序将每条消息发送给下一个消费者,每个消费者将获得相同数量的消息。

生产者

Send.cs代码:

/// <summary>
        /// 工作队列模式
        /// </summary>
        public static void WorkerSendMsg()
        {
            string queueName = "worker_order";//队列名
            //创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {
                    //创建队列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //消息持久化
                    for ( var i=0;i<10;i++)
                    {
                        string message = $"Hello RabbitMQ MessageHello,{i+1}";
                        var body = Encoding.UTF8.GetBytes(message);
                        //发送消息到rabbitmq
                        channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: properties, body);
                        Console.WriteLine($"发送消息到队列:{queueName},内容:{message}");
                    }
                }
            }
        }

参数durable:true,需要持久化,实际项目中肯定需要持久化的,不然重启RabbitMQ数据就会丢失了。

执行效果:

写入10条数据,有持久化标识D

消费端

Recevie代码:

public static void WorkerConsumer()
        {
            string queueName = "worker_order";
            var connection = RabbitMQHelper.GetConnection();
            {
                //创建信道
                var channel = connection.CreateModel();
                {
                    //创建队列
                    channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    //prefetchCount:1来告知RabbitMQ,不要同时给一个消费者推送多于 N 个消息,也确保了消费速度和性能
                    channel.BasicQos(prefetchSize: 0, prefetchCount:1, global: false);
                    int i = 1;
                    int index = new Random().Next(10);
                    consumer.Received += (model, ea) =>
                    {
                        //处理业务
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},消费者:{index},队列{queueName}消费消息长度:{message.Length}");
                         channel.BasicAck(ea.DeliveryTag, false); //消息ack确认,告诉mq这条队列处理完,可以从mq删除了               Thread.Sleep(1000);                        i++;
                    };
                    channel.BasicConsume(queueName,autoAck:false, consumer);
                }
            }
        }

BasicQos参数解析:

prefetchSize:每条消息大小,一般设为0,表示不限制。

prefetchCount:1,作用限流,告诉RabbitMQ不要同时给一个消费者推送多于N个消息,消费者会把N条消息缓存到本地一条条消费,如果不设,RabbitMQ会进可能快的把消息推到客户端,导致客户端内存升高。设置合理可以不用频繁从RabbitMQ 获取能提升消费速度和性能,设的太多的话则会增大本地内存,需要根据机器性能合理设置,官方建议设为30。

global:是否为全局设置。

这些限流设置针对消费者autoAck:false时才有效,如果是自动Ack的,限流不生效。

执行两个消费者,效果:

可以看到消费者号的标识,8,2,8,2是平均的,一个消费者5个,RabbitMQ上也能看到有2个消费者,Unacked数是2,因为每个客户端的限流数是1。

工作队列模式也是很常用的队列模式。

(3)发布订阅模式

Pulish/Subscribe,无选择接收消息,一个消息生产者,一个交换机(交换机类型为fanout),多个消息队列,多个消费者。称为发布/订阅模式

在应用中,只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。

生产者P只需把消息发送到交换机X,绑定这个交换机的队列都会获得一份一样的数据。

应用场景:适合于用同一份数据源做不同的业务。

生产者代码

/// <summary>
        /// 发布订阅, 扇形队列
        /// </summary>
        public static void SendMessageFanout()
        {
            //创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {
                    string exchangeName = "fanout_exchange";
                    //创建交换机,fanout类型
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
                    string queueName1 = "fanout_queue1";
                    string queueName2 = "fanout_queue2";
                    string queueName3 = "fanout_queue3";
                    //创建队列
                    channel.QueueDeclare(queueName1, false, false, false);
                    channel.QueueDeclare(queueName2, false, false, false);
                    channel.QueueDeclare(queueName3, false, false, false);

                    //把创建的队列绑定交换机,routingKey不用给值,给了也没意义的
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "");
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "");
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //消息持久化
                    //向交换机写10条消息
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Fanout {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "", null, body);
                        Console.WriteLine($"发送Fanout消息:{message}");
                    }
                }
            }
        }

执行代码:

向交换机发送10条消息,则绑定这个交换机的3个队列都会有10条消息。

消费端的代码和工作队列的一样,只需知道队列名即可消费,声明时要和生产者的声明一样。

(4)路由模式(推荐使用)

在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。

上图是一个结合日志消费级别的配图,在路由模式它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的direct模式。

生产者P发送数据是要指定交换机(X)和routing发送消息 ,指定的routingKey=error,则队列Q1和队列Q2都会有一份数据,如果指定routingKey=into,或=warning,交换机(X)只会把消息发到Q2队列。

生产者代码

/// <summary>
        /// 路由模式,点到点直连队列
        /// </summary>
        public static void SendMessageDirect()
        {
            //创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {
                    //声明交换机对象,fanout类型
                    string exchangeName = "direct_exchange";
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
                    //创建队列
                    string queueName1 = "direct_errorlog";
                    string queueName2 = "direct_alllog";
                    channel.QueueDeclare(queueName1, true, false, false);
                    channel.QueueDeclare(queueName2, true, false, false);

                    //把创建的队列绑定交换机,direct_errorlog队列只绑定routingKey:error
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "error");
                    //direct_alllog队列绑定routingKey:error,info
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "info");
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "error");
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //消息持久化
                    //向交换机写10条错误日志和10条Info日志
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Direct {i + 1} error Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "error", properties, body);
                        Console.WriteLine($"发送Direct消息error:{message}");

                        string message2 = $"RabbitMQ Direct {i + 1} info Message";
                        var body2 = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "info", properties, body2);
                        Console.WriteLine($"info:{message2}");

                    }
                }
            }
        }

这里创建一个direct类型的交换机,两个路由key,一个error,一个info,两个队列,一个队列只绑定error,一个队列绑定error和info,向error和info各发10条消息。

执行代码:

查看RabbitMQ管理界面,direct_errorlog队列10条,而direct_alllog有20条,因为direct_alllog队列两个routingKey的消息都进去了。

点进去看下两个队列绑定的交换机和routingKey

消费者代码

消费者和工作队列一样,只需根据队列名消费即可,这里只消费direct_errorlog队列作示例

public static void DirectConsumer()
        {
            string queueName = "direct_errorlog";
            var connection = RabbitMQHelper.GetConnection();
            {
                //创建信道
                var channel = connection.CreateModel();
                {
                    //创建队列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    ///prefetchCount:1来告知RabbitMQ,不要同时给一个消费者推送多于 N 个消息,也确保了消费速度和性能
                    ///global:是否设为全局的
                    ///prefetchSize:单条消息大小,通常设0,表示不做限制
                    //是autoAck=false才会有效
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    int i = 1;
                    consumer.Received += (model, ea) =>
                    {
                        //处理业务
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},队列{queueName}消费消息长度:{message.Length}");
                        channel.BasicAck(ea.DeliveryTag, false); //消息ack确认,告诉mq这条队列处理完,可以从mq删除了
                        i++;
                    };
                    channel.BasicConsume(queueName, autoAck: false, consumer);
                }
            }
        }

普通场景中推荐使用路由模式,因为路由模式有交换机,有路由key,能够更好的拓展各种应用场景。

(5)主题模式

topics(主题)模式跟routing路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由键 routingKey,类似于SQL中 = 和 like 的关系。

P 表示为生产者、 X 表示交换机、C1C2 表示为消费者,红色表示队列。

topics 模式与 routing 模式比较相近,topics 模式不能具有任意的 routingKey,必须由一个英文句点号“.”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词),比如 "lazy.orange.a"。topics routingKey 中可以存在两种特殊字符"*"与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

以上图为例:

如果发送消息的routingKey设置为:

aaa.orange.rabbit,那么消息会路由到Q1与Q2,

routingKey=aaa.orange.bb的消息会路由到Q1,

routingKey=lazy.aa.bb.cc的消息会路由到Q2;

routingKey=lazy.aa.rabbit的消息会路由到 Q2(只会投递给Q2一次,虽然这个routingKey 与 Q2 的两个 bindingKey 都匹配);

没匹配routingKey的消息将会被丢弃。

生产者代码

public static void SendMessageTopic()
        {
            //创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {
                    //声明交换机对象,fanout类型
                    string exchangeName = "topic_exchange";
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                    //队列名
                    string queueName1 = "topic_queue1";
                    string queueName2 = "topic_queue2";
                    //路由名
                    string routingKey1 = "*.orange.*";
                    string routingKey2 = "*.*.rabbit";
                    string routingKey3 = "lazy.#";
                    channel.QueueDeclare(queueName1, true, false, false);
                    channel.QueueDeclare(queueName2, true, false, false);

                    //把创建的队列绑定交换机,routingKey指定routingKey
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: routingKey1);
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey2);
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey3);
                    //向交换机写10条消息
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Direct {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "aaa.orange.rabbit", null, body);
                        channel.BasicPublish(exchangeName, routingKey: "lazy.aa.rabbit", null, body);
                        Console.WriteLine($"发送Topic消息:{message}");
                    }
                }
            }
        }

这里演示了 routingKey为aaa.orange.rabbit,和lazy.aa.rabbit的情况,第一个匹配到Q1和Q2,第二个匹配到Q2,所以应该Q1是10条,Q2有20条,

执行后看rabbitMQ界面:

(6)RPC模式

与上面其他5种所不同之处,该模式是拥有请求/回复的。也就是有响应的,上面5种都没有。

RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的处理业务,处理完后然后在A服务器继续执行下去,把异步的消息以同步的方式执行。

客户端(C)声明一个排他队列自己订阅,然后发送消息到RPC队列同时也把这个排他队列名也在消息里传进去,服务端监听RPC队列,处理完业务后把处理结果发送到这个排他队列,然后客户端收到结果,继续处理自己的逻辑。

RPC的处理流程:

  • 当客户端启动时,创建一个匿名的回调队列。
  • 客户端为RPC请求设置2个属性:replyTo:设置回调队列名字;correlationId:标记request。
  • 请求被发送到rpc_queue队列中。
  • RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
  • 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了。

服务端代码

public class RPCServer
    {
        public static void RpcHandle()
        {

            var connection = RabbitMQHelper.GetConnection();
            {
                var channel = connection.CreateModel();
                {
                    string queueName = "rpc_queue";
                    channel.QueueDeclare(queue: queueName, durable: false,
                      exclusive: false, autoDelete: false, arguments: null);
                    channel.BasicQos(0, 1, false);
                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume(queue: queueName,
                      autoAck: false, consumer: consumer);
                    Console.WriteLine("【服务端】等待RPC请求...");

                    consumer.Received += (model, ea) =>
                    {
                        string response = null;

                        var body = ea.Body.ToArray();
                        var props = ea.BasicProperties;
                        var replyProps = channel.CreateBasicProperties();
                        replyProps.CorrelationId = props.CorrelationId;

                        try
                        {
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine($"【服务端】接收到数据:{ message},开始处理");
                            response = $"消息:{message},处理完成";
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine("错误:" + e.Message);
                            response = "";
                        }
                        finally
                        {
                            var responseBytes = Encoding.UTF8.GetBytes(response);
                            channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                              basicProperties: replyProps, body: responseBytes);
                            channel.BasicAck(deliveryTag: ea.DeliveryTag,
                              multiple: false);
                        }
                    };
                }
            }
        }

    }

客户端

public class RPCClient
    {
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;

        public RPCClient()
        {
            connection = RabbitMQHelper.GetConnection();

            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new EventingBasicConsumer(channel);

            props = channel.CreateBasicProperties();
            var correlationId = Guid.NewGuid().ToString();
            props.CorrelationId = correlationId; //给消息id
            props.ReplyTo = replyQueueName;//回调的队列名,Client关闭后会自动删除

            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var response = Encoding.UTF8.GetString(body);
                //监听的消息Id和定义的消息Id相同代表这条消息服务端处理完成
                if (ea.BasicProperties.CorrelationId == correlationId)
                {
                    respQueue.Add(response);
                }
            };

            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);
        }

        public string Call(string message)
        {
            var messageBytes = Encoding.UTF8.GetBytes(message);
            //发送消息
            channel.BasicPublish(
                exchange: "",
                routingKey: "rpc_queue",
                basicProperties: props,
                body: messageBytes);
            //等待回复
            return respQueue.Take();
        }

        public void Close()
        {
            connection.Close();
        }
    }

执行代码

static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            //启动服务端,正常逻辑是在另一个程序
            RPCServer.RpcHandle();
            //实例化客户端
            var rpcClient = new RPCClient();
            string message = $"消息id:{new Random().Next(1, 1000)}";
            Console.WriteLine($"【客服端】RPC请求中,{message}");
            //向服务端发送消息,等待回复
            var response = rpcClient.Call(message);
            Console.WriteLine("【客服端】收到回复响应:{0}", response);
            rpcClient.Close();
            Console.ReadKey();
        }

测试效果:

z执行完,客服端close后,可以接着自己的下一步业务处理。

总结

以上便是RabbitMQ的6中模式在.net core中实际使用,其中(1)简单队列,(2)工作队列,(4)路由模式,(6)RPC模式的交换机类型都是direct,(3)发布订阅的交换机是fanout,(5)topics的交换机是topic。正常场景用的是direct,默认交换机也是direct类型的,推荐用(4)路由模式,因为指定交换机名比起默认的交换机会容易扩展场景,其他的交换机看业务场景所需使用。

下面位置可以看到交换机类型,amq.开头那几个是内置的,避免交换机过多可以直接使用。

到此这篇关于运用.net core中实例讲解RabbitMQ的文章就介绍到这了,更多相关.net core RabbitMQ内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • RabbitMQ 如何解决消息幂等性的问题

    前言 关于MQ消费者的幂等性问题,在于MQ的重试机制,因为网络原因或客户端延迟消费导致重复消费.使用MQ重试机制需要注意的事项以及如何解决消费者幂等性问题以下将逐一讲解. 1. RabbitMQ自动重试机制 消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这个时候我们如何处理? 使用重试机制,RabbitMQ默认开启重试机制. 实现原理: @RabbitHandler注解 底层使用Aop拦截,如果程序(消费者)没有抛出异常,自动提交事务 如果Aop使用异常通知拦截获取到异常后,自动实现补

  • RabbitMQ 的七种队列模式和应用场景

    七种模式介绍与应用场景 简单模式(Hello World) 做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B 应用场景:将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人 工作队列模式(Work queues) 在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理 应用场景:一个订单的处理需要10s,有多个订单可以同时放到消息队列

  • 如何用.NETCore操作RabbitMQ

    什么是RabbitMQ? RabbitMQ是由erlang语言开发的一个基于AMQP(Advanced Message Queuing Protocol)协议的企业级消息队列中间件.可实现队列,订阅/发布,路由,通配符等工作模式. 为什么要使用RabbitMQ? 异步处理:比如发送邮件,发送短信等不需要等待处理结果的操作 应用解耦:比如下单成功后,通知仓库发货,不需要等待仓库回应,通过消息队列去通知仓库,降低应用间耦合程序,可并行开发两个功能模块 流量削锋:在抢购或者其他的活动页,服务处于爆发式

  • 一篇文章带你从入门到精通:RabbitMQ

    目录 1. 浅浅道来 1.1 什么是中间件? 1.1.1 分布式的概念(补充) 1.2 什么是消息中间件/消息队列(MQ) 1.2.1 消息队列应用场景 1.3 什么是 RabbitMQ 2. 下载与安装 2.1 手动安装 2.1.1 下载安装过程 2.1.2 配置 Web 界面管理 2.1.3 简单介绍 Web 界面管理 2.2 Docker 安装 2.2.1 配置 yum 2.2.2 安装 docker 2.2.3 安装 RabbitMQ (任选其一) 3. RabbitMQ 协议和模型 3

  • .Net RabbitMQ实现HTTP API接口调用

    RabbitMQ Management插件还提供了基于RESTful风格的HTTP API接口来方便调用.一共涉及4种HTTP方法:GET.PUT.DELETE和POST.GET方法一般用来获取如集群.节点.队列.交换器等信息.PUT方法用来创建资源,如交换器.队列之类的.DELETE方法用来删除资源.POST方法也是用来创建资源的,与PUT不同的是,POST创建的是无法用具体名称的资源.比如绑定关系(bindings)和发布消息(publish)无法指定一个具体的名称. 点击Web管理界面左下

  • 运用.net core中实例讲解RabbitMQ高可用集群构建

    目录 一.集群架构简介 二.普通集群搭建 2.1 各个节点分别安装RabbitMQ 2.2 把节点加入集群 2.3 代码演示普通集群的问题 三.镜像集群 四.HAProxy环境搭建. 五.KeepAlived 环境搭建 一.集群架构简介 当单台 RabbitMQ 服务器的处理消息的能力达到瓶颈时,此时可以通过 RabbitMQ 集群来进行扩展,从而达到提升吞吐量的目的.RabbitMQ 集群是一个或多个节点的逻辑分组,集群中的每个节点都是对等的,每个节点共享所有的用户,虚拟主机,队列,交换器,绑

  • 运用.net core中实例讲解RabbitMQ

    目录 一.RabbitMQ简介 (1) AMQP协议 (2)AMQP专业术语 (3)RabbitMQ整体架构 二.安装RabbitMQ 三.RabbitMQ六种队列模式在.NetCore中使用 (1)简单队列 (2)工作队列模式 (3)发布订阅模式 (4)路由模式(推荐使用) (5)主题模式 (6)RPC模式 总结 一.RabbitMQ简介 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang(高并发语言)语言来编写的,并且Rabbi

  • 运用.NetCore实例讲解RabbitMQ死信队列,延时队列

    目录 一.死信队列 二.延时队列 三.延时消息设置不同过期时间 四.延时消息用延时插件的方式实现 一.死信队列 描述:Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2) P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息Q2的消息. 特定情况有哪些呢: 1.消息被拒(basic.

  • python队列通信:rabbitMQ的使用(实例讲解)

    (一).前言 为什么引入消息队列? 1.程序解耦 2.提升性能 3.降低多业务逻辑复杂度 (二).python操作rabbit mq rabbitmq配置安装基本使用参见上节文章,不再复述. 若想使用python操作rabbitmq,需安装pika模块,直接pip安装: pip install pika 1.最简单的rabbitmq producer端与consumer端对话: producer: #Author :ywq import pika auth=pika.PlainCredentia

  • angular2组件中定时刷新并清除定时器的实例讲解

    实例如下: import { Component,OnInit,ChangeDetectionStrategy,ChangeDetectorRef,OnDestroy} from "@angular/core"; @Component({ changeDetection:ChangeDetectionStrategy.OnPush }) export class xxxComponent{ private timer; constructor(private ref : ChangeD

  • 实例讲解Java的Spring框架中的控制反转和依赖注入

    近来总是接触到 IoC(Inversion of Control,控制反转).DI(Dependency Injection,依赖注入)等编程原则或者模式,而这些是著名 Java 框架 Spring.Struts 等的核心所在.针对此查了 Wikipedia 中各个条目,并从图书馆借来相关书籍,阅读后有些理解,现结合书中的讲解以及自己的加工整理如下: eg1 问题描述: 开发一个能够按照不同要求生成Excel或 PDF 格式的报表的系统,例如日报表.月报表等等.   解决方案: 根据"面向接口编

  • js中变量的连续赋值(实例讲解)

    今天遇到了一个连续赋值的经典案例,网友们给出的答案也是五花八门,看起来有些繁琐,我也来说说自己的看法. 下面就是这个经典案例: var a = {n: 1}: var b = a; a.x = a = {n: 2}: console.log(a); console.log(b); console.log(a.x); console.log(b.x): 我们先来看一下普通连续赋值,即:变量赋值的类型是数据类型值 var a=3; var b=a=5; console.log(a); console

  • 基于多线程中join()的用法实例讲解

    Thread中,join()方法的作用是调用线程等待该线程完成后,才能继续用下运行. public class TestThread5 { public static void main(String[] args) throws InterruptedException { Runner0 run5 = new Runner0(); Thread th5 = new Thread(run5); th5.start(); th5.join();//join()方法用在此处是为了等待主线程结束后运

  • java求两个数中的大数(实例讲解)

    java中的max函数在Math中 应用如下: int a=34: int b=45: int ans=Math.max(34,45); 那么ans的值就是45. 以上这篇java求两个数中的大数(实例讲解)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们.

  • Spring MVC中自定义拦截器的实例讲解

    1. 引言 拦截器(Interceptor)实现对每一个请求处理前后进行相关的业务处理,类似于Servlet的Filter. 我们可以让普通的Bean实现HandlerIntercpetor接口或继承HandlerInterceptorAdapter类来实现自定义拦截器. 通过重写WebMvcConfigurerAdapter的addIntercetors方法来注册一个计算每一次请求的处理时间的拦截器. 2. 自定义拦截器的实现 2.1 定义拦截器 新建LogInterceptor类,并继承Ha

随机推荐