RocketMQ消息队列实现随机消息发送当做七夕礼物

目录
  • 正文
  • 1 下载并启动RocketMQ
    • 1.1 首先启动name server
    • 1.2 然后启动Broker
  • 2 生产者
  • 3 消费者

正文

都在过情人节,前端的小哥哥们给女朋友画个页面,美美的,写个chrome插件,好看的,俺们后端程序员咋办。

我给媳妇写首诗,哈哈

我决定,把想对媳妇说的,今天发送到一个MQ里边,然后在七夕当天,打开消费者,将这一段话给俺媳妇看。你看,这就是我好久前对你说的话,这就是我们后端程序员的浪漫。当然也可以多发送几个,到时候跟根据topic控制到底发什么,哈哈。

这里首先得用顺序消息,当然,消息过期时间得设置的长一点。

1 下载并启动RocketMQ

点击下载,这是个windows版本的。

下载完成解压后长这样:

然后后还需要配置环境变量

这个时候就可以进入到RocketMQ的bin目录启动MQ了

1.1 首先启动name server

start mqnamesrv.cmd

1.2 然后启动Broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

这个时候就启动成功了

2 生产者

需要注意的是,消息必须是顺序消息,不然发送的消息顺序就乱了。一首情诗顺序乱了,读不下去,岂不是很尴尬。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class RocketMQOrderProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        String[] tags = new String[]{"TagA", "TagC", "TagD"};
        //读取文件
        List<String> messages = getMessages();
        for (int i = 0; i < messages.size(); i++) {
            String body = messages.get(i);
            Message msg = new Message("topic_luke", tags[i % tags.length], "KEY" + i, body.getBytes());
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = ((Integer)arg).longValue();
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, i);
        }
        producer.shutdown();
    }
    static List<String> getMessages() throws IOException {
        String temp = null;
        File f = new File("E:\Code\online-taxi-three\demo\luke.txt");
        InputStreamReader read = new InputStreamReader(new FileInputStream(f));
        ArrayList readList = new ArrayList();
        BufferedReader reader = new BufferedReader(read);
        while ((temp = reader.readLine()) != null && !"".equals(temp)) {
            readList.add(temp);
        }
        return readList;
    }
}

3 消费者

这里需要注意的是setConsumeThreadMaxsetConsumeThreadMin都需要设置成1,单线程取消息这样就可以通过sleep控制消息的显示速度,不然并发取消息就很快显示完了。不够唯美。

import lombok.SneakyThrows;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class RockerMQConsumer {
    public static void main(String[] args) throws Exception {
        //实例化消息消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
        //指定nameserver地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeThreadMax(1);
        consumer.setConsumeThreadMin(1);
        consumer.setPullBatchSize(1);
        //订阅topic
        consumer.subscribe("topic_luke","*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @SneakyThrows
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                    TimeUnit.SECONDS.sleep(3);
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

发送的内容在这里自由编写哈,路径和文件名能对上就行,谢谢观看,最近突发奇想,把技术编成故事讲出来,会不会比较受欢迎呢。

以上就是RocketMQ消息队列实现随机消息发送当做七夕礼物的详细内容,更多关于RocketMQ消息队列随机消息的资料请关注我们其它相关文章!

(0)

相关推荐

  • 使用Kotlin+RocketMQ实现延时消息的示例代码

    一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 使用延时消息的典型场景,例如: 在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消. 在电商系统中,用户七天内没有评价商品,则默认好评. 这些场景对应的解决方案,包括: 轮询遍历数据库记录 JDK 的 DelayQueue ScheduledExecutorService 基于 Quartz 的定时任务 基于 Redis 的 zset 实现延时队列. 除此之外,

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • kafka rabbitMQ及rocketMQ队列的消息可靠性保证分析

    目录 1.消息丢失 1.生产者发送失败 2.消费者消费失败 3.队列因为自身体原因丢失数据 2.消息顺序 1.kafka 2.rocketMQ 3.rabbitMQ 3.消息重复 1.消息丢失 1.生产者发送失败 所有消息队列都可能发生的问题 生产者发送消息后,队列未成功接收(网络原因或其他)而生产者不知情,消息丢失 生产者发送消息后,队列接收成功->生产者确认,但消息并未持久化,队列崩溃,消息丢失 针对这类问题,三种消息队列都提供了生产者消息发送确认的模式,例如将kafka的acks参数设置为

  • java开发线上事故理解RocketMQ异步精髓

    目录 引言 1 业务场景 2 线程池模式 3 本地内存 + 定时任务 4 MQ 模式 5 Agent 服务 + MQ 模式 6 总结 第一层:什么场景下需要异步 第二层:异步的外功心法 第三层:异步的本质 引言 在高并发的场景下,异步是一个极其重要的优化方向. 前段时间,生产环境发生一次事故,笔者认为事故的场景非常具备典型性 . 写这篇文章,笔者想和大家深入探讨该场景的架构优化方案.希望大家读完之后,可以对异步有更深刻的理解. 1 业务场景 老师登录教研平台,会看到课程列表,点击课程后,课程会以

  • RocketMQ消息队列实现随机消息发送当做七夕礼物

    目录 正文 1 下载并启动RocketMQ 1.1 首先启动name server 1.2 然后启动Broker 2 生产者 3 消费者 正文 都在过情人节,前端的小哥哥们给女朋友画个页面,美美的,写个chrome插件,好看的,俺们后端程序员咋办. 我给媳妇写首诗,哈哈 我决定,把想对媳妇说的,今天发送到一个MQ里边,然后在七夕当天,打开消费者,将这一段话给俺媳妇看.你看,这就是我好久前对你说的话,这就是我们后端程序员的浪漫.当然也可以多发送几个,到时候跟根据topic控制到底发什么,哈哈. 这

  • Python中线程的MQ消息队列实现以及消息队列的优点解析

    "消息队列"是在消息的传输过程中保存消息的容器.消息队列管理器在将消息从它的源中继到它的目标时充当中间人.队列的主要目的是提供路由并保证消息的传递:如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它.相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由: Python的消息队列示例: 1.threading+Queue实现线程队列 #!/usr/bin/env python import Queue import threading import

  • php基于Redis消息队列实现的消息推送的方法

    基本知识点 重点用到了以下命令实现我们的消息推送 brpop 阻塞模式 从队列右边获取值之后删除 brpoplpush 从队列A的右边取值之后删除,从左侧放置到队列B中 逻辑分析 在普通的任务脚本中写入push_queue队列要发送消息的目标,并为目标设置一个要推送的内容,永不过期 RedisPushQueue中brpoplpush处理,处理后的值放到temp_queue,主要防止程序崩溃造成推送失败 RedisAutoDeleteTempqueueItems处理temp_queue,这里用到了

  • 解决RabbitMq消息队列Qos Prefetch消息堵塞问题

    mq是实现代码扩展的有利手段,个人喜欢用概念来学习新知识,介绍堵塞问题的之前,先来段概念的学习. ConnectionFactory:创建connection的工厂类 Connection: 简单理解为socket Channel:和mq交互的接口,定义queue.exchange和绑定queue.exhange等接口都是它. 接下来就是和mq的交互类 exchange:简单地看成路由,类型不是重点,看看官网即可 queue:客户端监听的是queue,而不是exchange,但是使用queue的

  • RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    目录 消息队列常见问题处理 分布式事务 什么是分布式事务 常见的分布式事务解决方案 基于MQ实现的分布式事务 本地消息表-最终一致性 MQ事务-最终一致性 RocketMQ中如何处理事务 Kafka中如何处理事务 RabbitMQ中的事务 消息防丢失 生产阶段防止消息丢失 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ中的防丢失措施使用SYNC的发送消息方式,等待broker处理结果 存储阶段 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ

  • C++消息队列(定义,结构,如何创建,发送与接收)

    目录 一.定义 二.结构 三.消息队列的创建 四.消息队列的发送与接收 五.小结 一.定义 1.消息队列是一种先进先出的队列型数据结构,实际上是系统内核中的一个内部链表.消息被顺序插入队列中,其中发送进程将消息添加到队列末尾,接受进程从队列头读取消息.2.多个进程可同时向一个消息队列发送消息,也可以同时从一个消息队列中接收消息.发送进程把消息发送到队列尾部,接受进程从消息队列头部读取消息,消息一旦被读出就从队列中删除. 二.结构 1.消息队列中消息本身由消息类型和消息数据组成,通常使用如下结构:

  • Java中消息队列任务的平滑关闭详解

    前言 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 消息队列应用场景 消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景. 本文主要给大家介绍的是关于Java中消息队列任务平滑关闭的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 1.问题背

  • 进程间通信之深入消息队列的详解

    最近在Hi3515上调试Qt与DVR程序,发现他们之间使用消息队列通信的,闲暇之余,就总结了一下消息队列,呵呵,自认为通俗易懂,同时,在应用中也发现了消息队列的强大之处. 关于线程的管理(互斥量和条件变量)见:Linux线程管理必备:解析互斥量与条件变量的详解 一.消息队列的特点 1.消息队列是消息的链表,具有特定的格式,存放在内存中并由消息队列标识符标识.    2.消息队列允许一个或多个进程向它写入与读取消息.    3.管道和命名管道都是通信数据都是先进先出的原则.    4.消息队列可以

  • 浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式;2.并行方式 a.串

  • Java实现Redis延时消息队列

    目录 什么是延时任务 延时任务的特点 实现思路: 代码实现 1.消息模型 2.RedisMq 消息队列实现类 3.消息生产者 4.消息消费者 5. 消息执接口 6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求 什么是延时任务 延时任务,顾名思义,就是延迟一段时间后才执行的任务.举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了.只要在前一天下班前指定第二天要发送资讯的时间,到了第二天

随机推荐