RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

目录
  • RocketMq消息处理
  • 1. 处理PULL_MESSAGE请求
  • 2. 获取消息
  • 3. 挂起请求:PullRequestHoldService#suspendPullRequest
    • 3.1 处理挂起请求的线程:PullRequestHoldService
    • 3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup
    • 3.3 消息分发中唤醒consumer请求
  • 总结

RocketMq消息处理

RocketMq消息处理整个流程如下:

本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

  • 消息接收:消息接收是指接收producer的消息,处理类是SendMessageProcessor,将消息写入到commigLog文件后,接收流程处理完毕;
  • 消息分发:broker处理消息分发的类是ReputMessageService,它会启动一个线程,不断地将commitLong分到到对应的consumerQueue,这一步操作会写两个文件:consumerQueueindexFile,写入后,消息分发流程处理 完毕;
  • 消息投递:消息投递是指将消息发往consumer的流程,consumer会发起获取消息的请求,broker收到请求后,调用PullMessageProcessor类处理,从consumerQueue文件获取消息,返回给consumer后,投递流程处理完毕。

以上就是rocketMq处理消息的流程了,接下来我们就从源码来分析消息投递的实现。

1. 处理PULL_MESSAGE请求

producer不同,consumerbroker拉取消息时,发送的请求codePULL_MESSAGEprocessorPullMessageProcessor,我们直接进入它的processRequest方法:

@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
    // 调用方法
    return this.processRequest(ctx.channel(), request, true);
}

这个方法就只是调用了一个重载方法,多出来的参数true表示允许broker挂起请求,我们继续,

/**
 * 继续处理
 */
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
        boolean brokerAllowSuspend)throws RemotingCommandException {
    RemotingCommand response = RemotingCommand
        .createResponseCommand(PullMessageResponseHeader.class);
    final PullMessageResponseHeader responseHeader
        = (PullMessageResponseHeader) response.readCustomHeader();
    final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader)
        request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
    response.setOpaque(request.getOpaque());
    // 省略权限校验流程
    // 1. rocketMq 可以设置校验信息,以阻挡非法客户端的连接
    // 2. 同时,对topic可以设置DENY(拒绝)、ANY(PUB 或者 SUB 权限)、PUB(发送权限)、SUB(订阅权限)等权限,
    //    可以细粒度控制客户端对topic的操作内容
    ...
    // 获取订阅组
    SubscriptionGroupConfig subscriptionGroupConfig =
        this.brokerController.getSubscriptionGroupManager()
        .findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
    ...
    // 获取订阅主题
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager()
        .selectTopicConfig(requestHeader.getTopic());
    ...
    // 处理filter
    // consumer在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种:tag与sql92
    // 这里我们重点关注拉取消息的流程,具体的过滤细节后面再分析
    ...
    // 获取消息
    // 1. 根据 topic 与 queueId 获取 ConsumerQueue 文件
    // 2. 根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容
    final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(),
        requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    if (getMessageResult != null) {
        // 省略一大堆的校验过程
        ...
        switch (response.getCode()) {
            // 表示消息可以处理,这里会把消息内容写入到 response 中
            case ResponseCode.SUCCESS:
                ...
                // 处理消息消息内容,就是把消息从 getMessageResult 读出来,放到 response 中
                if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                    final long beginTimeMills = this.brokerController.getMessageStore().now();
                    // 将消息内容转为byte数组
                    final byte[] r = this.readGetMessageResult(getMessageResult,
                        requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                        requestHeader.getQueueId());
                    ...
                    response.setBody(r);
                } else {
                    try {
                        // 消息转换
                        FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(
                            getMessageResult.getBufferTotalSize()), getMessageResult);
                        channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                            ...
                        });
                    } catch (Throwable e) {
                        ...
                    }
                    response = null;
                }
                break;
            // 未找到满足条件的消息
            case ResponseCode.PULL_NOT_FOUND:
                // 如果支持挂起,就挂起当前请求
                if (brokerAllowSuspend && hasSuspendFlag) {
                    ...
                    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                        this.brokerController.getMessageStore().now(), offset, subscriptionData,
                        messageFilter);
                    // 没有找到相关的消息,挂起操作
                    this.brokerController.getPullRequestHoldService()
                        .suspendPullRequest(topic, queueId, pullRequest);
                    response = null;
                    break;
                }
            // 省略其他类型的处理
            ...
                break;
            default:
                assert false;
        }
    } else {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("store getMessage return null");
    }
    ...
    return response;
}

在源码中,这个方法也是非常长,这里我抹去了各种细枝末节,仅留下了一些重要的流程,整个处理流程如下:

  • 权限校验:rocketMq 可以设置校验信息,以阻挡非法客户端的连接,同时也可以设置客户端的发布、订阅权限,细节度控制访问权限;
  • 获取订阅组、订阅主题等,这块主要是通过请求消息里的内容获取broker中对应的记录
  • 创建过滤组件:consumer在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种:tagsql92
  • 获取消息:先是根据 topicqueueId 获取 ConsumerQueue 文件,根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容,消息的过滤操作也是发生在这一步
  • 转换消息:如果获得了消息,就是把具体的消息内容,复制到reponse
  • 挂起请求:如果没获得消息,而当前请求又支持挂起,就挂起当前请求

以上代码还是比较清晰的,相关流程代码中都作了注释。

以上流程就是整个消息的获取流程了,在本文中,我们仅关注与获取消息相关的步骤,重点关注以下两个操作:

  • 获取消息
  • 挂起请求

2. 获取消息

获取消息的方法为DefaultMessageStore#getMessage,代码如下:

public GetMessageResult getMessage(final String group, final String topic, final int queueId,
        final long offset, final int maxMsgNums, final MessageFilter messageFilter) {
    // 省略一些判断
    ...
    // 根据topic与queueId一个ConsumeQueue,consumeQueue记录的是消息在commitLog的位置
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    if (consumeQueue != null) {
        minOffset = consumeQueue.getMinOffsetInQueue();
        maxOffset = consumeQueue.getMaxOffsetInQueue();
        if (...) {
            // 判断 offset 是否符合要求
            ...
        } else {
            // 从 consumerQueue 文件中获取消息
            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
            if (bufferConsumeQueue != null) {
                ...
                for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount;
                    i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 省略一大堆的消息过滤操作
                    ...
                    // 从 commitLong 获取消息
                    SelectMappedBufferResult selectResult
                            = this.commitLog.getMessage(offsetPy, sizePy);
                    if (null == selectResult) {
                        if (getResult.getBufferTotalSize() == 0) {
                            status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                        }
                        nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                        continue;
                    }
                    // 省略一大堆的消息过滤操作
                    ...
                }
            }
    } else {
        status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
        nextBeginOffset = nextOffsetCorrection(offset, 0);
    }
    if (GetMessageStatus.FOUND == status) {
        this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
    } else {
        this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
    }
    long elapsedTime = this.getSystemClock().now() - beginTime;
    this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
    getResult.setStatus(status);
    // 又是处理 offset
    getResult.setNextBeginOffset(nextBeginOffset);
    getResult.setMaxOffset(maxOffset);
    getResult.setMinOffset(minOffset);
    return getResult;
}

这个方法不是比较长的,这里仅保留了关键流程,获取消息的关键流程如下:

  • 根据topicqueueId找到ConsumerQueue
  • ConsumerQueue对应的文件中获取消息信息,如taghashCode、消息在commitLog中的位置信息
  • 根据位置信息,从commitLog中获取完整的消息

经过以上步骤,消息就能获取到了,不过在获取消息的前后,会进行消息过滤操作,即根据tagsql语法来过滤消息,关于消息过滤的一些细节,我们留到后面消息过滤相关章节作进一步分析。

3. 挂起请求:PullRequestHoldService#suspendPullRequest

broker无新消息时,consumer拉取消息的请求就会挂起,方法为PullRequestHoldService#suspendPullRequest

public class PullRequestHoldService extends ServiceThread {
    private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
        new ConcurrentHashMap<String, ManyPullRequest>(1024);
    public void suspendPullRequest(final String topic, final int queueId,
            final PullRequest pullRequest) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (null == mpr) {
            mpr = new ManyPullRequest();
            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
            if (prev != null) {
                mpr = prev;
            }
        }
        mpr.addPullRequest(pullRequest);
    }
    ...
}

suspendPullRequest方法中,所做的工作仅是把当前请求放入pullRequestTable中了。从代码中可以看到,pullRequestTable是一个ConcurrentMapkeytopic@queueIdvalue 就是挂起的请求了。

请求挂起后,何时处理呢?这就是PullRequestHoldService线程的工作了。

3.1 处理挂起请求的线程:PullRequestHoldService

看完PullRequestHoldService#suspendPullRequest方法后,我们再来看看PullRequestHoldService

PullRequestHoldServiceServiceThread的子类(上一次看到ServiceThread的子类还是ReputMessageService),它也会启动一个新线程来处理挂起操作。

我们先来看看它是在哪里启动PullRequestHoldService的线程的,在BrokerController的启动方法start()中有这么一行:

BrokerController#start

public void start() throws Exception {
    ...
    if (this.pullRequestHoldService != null) {
        this.pullRequestHoldService.start();
    }
    ...
}

这里就是启动pullRequestHoldService的线程操作了。

为了探究这个线程做了什么,我们进入PullRequestHoldService#run方法:

@Override
public void run() {
    log.info("{} service started", this.getServiceName());
    while (!this.isStopped()) {
        try {
            // 等待中
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(
                    this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }
            long beginLockTimestamp = this.systemClock.now();
            // 检查操作
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    log.info("{} service end", this.getServiceName());
}

从代码来看,这个线程先是进行等待,然后调用PullRequestHoldService#checkHoldRequest方法,看来关注就是这个方法了,它的代码如下:

private void checkHoldRequest() {
    for (String key : this.pullRequestTable.keySet()) {
        String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
        if (2 == kArray.length) {
            String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            final long offset = this.brokerController.getMessageStore()
                .getMaxOffsetInQueue(topic, queueId);
            try {
                // 调用notifyMessageArriving方法操作
                this.notifyMessageArriving(topic, queueId, offset);
            } catch (Throwable e) {
                log.error(...);
            }
        }
    }
}

这个方法调用了PullRequestHoldService#notifyMessageArriving(...),我们继续进入:

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
    // 继续调用
    notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}
/**
 * 这个方法就是最终调用的了
 */
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset,
    final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (mpr != null) {
        List<PullRequest> requestList = mpr.cloneListAndClear();
        if (requestList != null) {
            List<PullRequest> replayList = new ArrayList<PullRequest>();
            for (PullRequest request : requestList) {
                // 判断是否有新消息到达,要根据 comsumerQueue 的偏移量与request的偏移量判断
                long newestOffset = maxOffset;
                if (newestOffset <= request.getPullFromThisOffset()) {
                    newestOffset = this.brokerController.getMessageStore()
                        .getMaxOffsetInQueue(topic, queueId);
                }
                if (newestOffset > request.getPullFromThisOffset()) {
                    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                        new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                    if (match && properties != null) {
                        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                    }
                    if (match) {
                        try {
                            // 唤醒操作
                            this.brokerController.getPullMessageProcessor()
                                .executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }
                }
                // 超时时间到了
                if (System.currentTimeMillis() >=
                        (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                    try {
                        // 唤醒操作
                        this.brokerController.getPullMessageProcessor()
                            .executeRequestWhenWakeup(request.getClientChannel(),
                            request.getRequestCommand());
                    } catch (Throwable e) {
                        log.error("execute request when wakeup failed.", e);
                    }
                    continue;
                }
                replayList.add(request);
            }
            if (!replayList.isEmpty()) {
                mpr.addPullRequest(replayList);
            }
        }
    }
}

这个方法就是用来检查是否有新消息送达的操作了,方法虽然有点长,但可以用一句话来总结:如果有新消息送达,或者pullRquest hold住的时间到了,就唤醒pullRquest(即调用PullMessageProcessor#executeRequestWhenWakeup方法)。

  • 在判断是否有新消息送达时,会获取comsumerQueue文件中的最大偏移量,与当前pullRquest中的偏移量进行比较,如果前者大,就表示有新消息送达了,需要唤醒pullRquest
  • 前面说过,当consumer请求没获取到消息时,brokerhold这个请求一段时间(30s),当这个时间到了,也会唤醒pullRquest,之后就不会再hold住它了

3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup

我们再来看看 PullMessageProcessor#executeRequestWhenWakeup 方法:

public void executeRequestWhenWakeup(final Channel channel,
    final RemotingCommand request) throws RemotingCommandException {
    // 关注 Runnable#run() 方法即可
    Runnable run = new Runnable() {
        @Override
        public void run() {
            try {
                // 再一次调用 PullMessageProcessor#processRequest(...) 方法
                final RemotingCommand response = PullMessageProcessor.this
                    .processRequest(channel, request, false);
                ...
            } catch (RemotingCommandException e1) {
                log.error("excuteRequestWhenWakeup run", e1);
            }
        }
    };
    // 提交任务
    this.brokerController.getPullMessageExecutor()
        .submit(new RequestTask(run, channel, request));
}

这个方法准备了一个任务,然后将其提交到线程池中执行,任务内容很简单,仅是调用了PullMessageProcessor#processRequest(...) 方法,这个方法就是本节一始提到的处理consumer拉取消息的方法了。

3.3 消息分发中唤醒consumer请求

在分析消息分发流程时,DefaultMessageStore.ReputMessageService#doReput方法中有这么一段:

private void doReput() {
    ...
    // 分发消息
    DefaultMessageStore.this.doDispatch(dispatchRequest);
    // 长轮询:如果有消息到了主节点,并且开启了长轮询
    if (BrokerRole.SLAVE != DefaultMessageStore.this
            .getMessageStoreConfig().getBrokerRole()
            &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
        // 调用NotifyMessageArrivingListener的arriving方法
        DefaultMessageStore.this.messageArrivingListener.arriving(
            dispatchRequest.getTopic(),
            dispatchRequest.getQueueId(),
            dispatchRequest.getConsumeQueueOffset() + 1,
            dispatchRequest.getTagsCode(),
            dispatchRequest.getStoreTimestamp(),
            dispatchRequest.getBitMap(),
            dispatchRequest.getPropertiesMap());
    }
    ...
}

这段就是用来主动唤醒hold住的consumer请求的,我们进入NotifyMessageArrivingListener#arriving方法:

 @Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
        msgStoreTime, filterBitMap, properties);
}

最终它也是调用了 PullRequestHoldService#notifyMessageArriving(...) 方法。

总结

本文主要分析了broker处理PULL_MESSAGE请求的流程,总结如下:

  • broker处理PULL_MESSAGEprocessorPullMessageProcessorPullMessageProcessorprocessRequest(...)就是整个消息获取流程了
  • broker在获取消息时,先根据请求的topicqueueId找到consumerQueue,然后根据请求中的offset参数从consumerQueue文件中找到消息在commitLog的位置信息,最后根据位置信息从commitLog中获取消息内容
  • 如果broker中没有当前consumerQueue的消息,broker会挂起当前线程,直到超时(默认30s)或收到新的消息时再唤醒

参考

RocketMQ源码分析专栏

以上就是RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析的详细内容,更多关于RocketMQ broker 消息投递的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ消息生产者是如何选择Broker示例详解

    目录 前言 从NameServer查询Topic信息 如何选择Broker 小结 前言 在RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢? 从NameServer查询Topic信息 通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sen

  • RocketMQ生产者如何规避故障Broker方式详解

    目录 前言 收集故障Broker 选择Broker 小结 前言 在消息发送过程中,生产者从NameServer中获取到了指定Topic对应的Broker信息,在同步发送消息的代码中,如果消息发送失败,生产者默认是会重试两次的.那么Broker有问题的情况下,无论重试多少次都是没有意义的,消息生产者是如何规避这些故障Broker的呢? 收集故障Broker 我们在所有的发送消息源码中都可以找到这样一段代码,可在DefaultMQProducerImpl类中查找: this.updateFaultI

  • RocketMQ特性Broker存储事务消息实现

    目录 引言 TransactionalMessageService 处理事务消息 第一处: 第二处: 引言 在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的. private void initialTransaction() { this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE

  • RocketMQ broker文件清理源码解析

    目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2. 源码解析 2.1 清理commitlog 2.2 ConsumeQueue 清理 2.3 indexFile 清理 3. 总结 1. broker 清理文件介绍 本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈 1.1 哪些文件需要清理 首先我们需要介绍下在RocketMQ中哪些文件需要清理,其实可以想一想,在RocketMQ

  • RocketMQ4.5.2 修改mqnamesrv 和 mqbroker的日志路径操作

    此解决方案是针对window的,因为日志默认保存路径在C盘,linux忽略. 学习RocketMQ过程中,总是出现 com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now, maybe disk full, CL: 0.87 CQ: 0.87 INDEX: 0.87, maybe your broker machine memory too small. 这

  • RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

    目录 RocketMq消息处理 1. 处理PULL_MESSAGE请求 2. 获取消息 3. 挂起请求:PullRequestHoldService#suspendPullRequest 3.1 处理挂起请求的线程:PullRequestHoldService 3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup 3.3 消息分发中唤醒consumer请求 总结 RocketMq消息处理 RocketMq消息处理整个流程如下: 本系列Roc

  • RocketMq事务消息发送代码流程详解

    一.RocketMq事务消息流程: 1.首先会向broker发送一个预请求消息,消费者不可见 2.回调执行本地事务(比如操作数据库) 3.事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见.如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功. 二.RocketMq事务消息实例: 1.引入rocketMq相关的依赖: <dependency> <groupId>org.apache.ro

  • Springboot详解RocketMQ实现消息发送与接收流程

    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送.同步发送和异步发送. 下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收 创建Springboot项目,添加rockermq 依赖 <!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-

  • RocketMQ事务消息保证消息的可靠性和一致性

    这篇讲解一下rocketMq的事务消息的原理 在发送事务消息的时候,会加一个标识,表示这个消息是事务消息.broker接收到消息后,在我们之前看的代码里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage会判断是否是事务消息. if (sendTransactionPrepareMessage) { asyncPutMessageFuture = this.brokerController.getTransac

  • RocketMQ事务消息原理与使用详解

    目录 一.RocketMQ事务消息概要 二.RocketMQ事务消息使用案例 (1).定义消息监听器 (2).定义消息生产者 (3).定义消息消费者 (4).观察生产者控制台输出 (5).观察消费者控制台输出 三.RocketMQ事务消息原理 四.RocketMQ事务消息使用限制 一.RocketMQ事务消息概要 RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ的事务消息提供类

  • RocketMQ普通消息实战演练详解

    目录 引言 普通消息同步发送 普通消息异步发送 普通消息单向发送 集群消费模式 广播消费模式 引言 之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV. 相关的配置,安装和启动在这篇文章有相关讲解  https://www.jb51.net/article/260237.htm 普通消息同步发送 同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息.这个方式可以确保消息发送到Broker成功,一些重要的

  • RocketMQ延迟消息简明介绍

    目录 前言 核心属性 RMQ_SYS_SCHEDULE_TOPIC FIRST_DELAY_TIME DELAY_FOR_A_WHILE DELAY_FOR_A_PERIOD delayLevelTable offsetTable 核心方法 queueId2DelayLevel delayLevel2QueueId updateOffset computeDeliverTimestamp start() shutdown() load() parseDelayLevel 前言 场景可以是这样的,

  • RocketMQ延迟消息超详细讲解

    目录 一.什么是延时消息 二.延时消息等级 三.延时消息使用场景 四.延时消息示例 五.延时消息实现原理 一.什么是延时消息 当消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息,称为延时消息. 二.延时消息等级 RocketMQ延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的.默认支持18个等级的延迟消息,延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中: // MessageStoreConf

  • WM_CLOSE、WM_DESTROY、WM_QUIT及各种消息投递函数详解

    本文对WM_CLOSE.WM_DESTROY.WM_QUIT及各种消息投递函数的功能及区别做出了分析比对,有助于读者更好的对消息投递函数加以理解.详情如下: 一.WM_CLOSE.WM_DESTROY.WM_QUIT区别 WM_CLOSE:关闭应用程序窗口 WM_DESTROY:关闭应用程序 WM_QUIT:关闭消息循环 只有关闭了消息循环,应用程序的进程才真正退出(在任务管理器里消失). win32应用程序的完整退出过程:点击窗口右上角的关闭按钮,发送WM_CLOSE消息.此消息处理中调用De

  • Spring Boot 整合RocketMq实现消息过滤功能

    目录 简介 根据TAG过滤消息 生产者 消费者 测试结果 根据SQL表达式过滤消息 生产者 消费者 启动程序报错The broker does not support consumer to filter message by SQL92 测试结果 总结 简介 消息过滤是指消费者一端在消费消息时,对消息进行选择性过滤,只消费符合过滤条件的消息. RocketMQ的消息过滤机制大致分为两种:标签过滤和类过滤.其中标签过滤又分为Tag过滤和SQL92过滤. 根据TAG过滤消息 消息发送端只能设置一个

随机推荐