详解RocketMQ 消费端如何监听消息

目录
  • 前言
  • 流程地图
  • 源码跟踪
    • 核心模块(消息拉取)
    • 拉取流程
    • 拉取消息处理
      • 当pullStatus为FOUND,消息进行提交消费的请求
    • 消息消费进度提交
  • 总结

前言

上一篇文章中我们主要来看RocketMQ消息消费者是如何启动的,

那他有一个步骤是非常重要的,就是启动消息的监听,通过不断的拉取消息,来实现消息的监听,那具体怎么做,让我们我们跟着源码来学习一下~

流程地图

源码跟踪

这一块的代码比较多,我自己对关键点的一些整理,这个图我画的不是很OK

核心模块(消息拉取)

入口:this.pullMessageService.start();

  • 执行线程池run方法,轮流从pullRequestQueue中获取PullRequest

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

声明一个阻塞队列用来存放 PullRequest 对象

PullRequest 用于消息拉取任务,如果 pullRequestQueue 为空则会阻塞,直到拉取任务被放入

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

将 stopped 用volatile来修饰,每次执行的时候都检测stopped的状态,线程只要修改了这个状态,其余线程就会马上知道

protected volatile boolean stopped = false;
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    // 判断启动状态
    while (!this.isStopped()) {
        try {
            // 取出一个PullRequest对象
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}
  • 获取消费队列快照,判断状态是否正常,同时更新最后一次拉取时间

PullMessageService 从消息服务器默认拉取32条消息,按消息的偏移量顺序存放在 ProcessQueue 队列

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

// 获取消费队列快照
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
    log.info("the pull request[{}] is dropped.", pullRequest.toString());
    return;
}
// 设置最后一次拉取时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
  • 校验客户端运行状态
// 校验状态
this.makeSureStateOK();
private void makeSureStateOK() throws MQClientException {
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The consumer service state not OK, "
            + this.serviceState
            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    }
}

如果消费者状态不正确,则抛出异常,启动定时线程池过段时间回收 PullRequest 对象,以便pullMessageService能及时唤醒并再次执行消息拉取,这个逻辑在多个地方使用到了

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    if (!isStopped()) {
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    } else {
        log.warn("PullMessageServiceScheduledThread has shutdown");
    }
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        // 最后将pullRequest放入pullRequestQueue中
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}
  • 校验消费队列中的消息数量和大小是否符合设置

如果触发流量控制,则延迟拉取消息,先将 PullRequest 对象进行回收,以便pullMessageService能及时唤醒并再次执行消息拉取

// 缓存消息条数
long cachedMessageCount = processQueue.getMsgCount().get();
// 缓存消息的大小
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 当队列中的消息跳过,超过设置 则延迟拉取消息
if (cachedMessageCount &gt; this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
  • 根据主题获取配置的订阅关系

这里通过查询 subscriptionInner Map容器,利用主题来获取对应的订阅关系,如果没有找到对应的订阅关系,则延迟拉取消息,先将 PullRequest 对象进行回收以便 pullMessageService 能及时唤醒并再次执行消息拉取

protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
    new ConcurrentHashMap<String, SubscriptionData>();
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    log.warn("find the consumer's subscription failed, {}", pullRequest);
    return;
}
  • 如果为集群模式,则从内存中读取位置

通过消费者启动的模块中,我们知道RocketMQ是根据不同模式,将消息进度存储在不同的地方

广播模式:消息进度存储在本地文件

集群模式:消息进度存储在Broker 服务器上

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
    // 从内存中读取位置
    commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
    if (commitOffsetValue > 0) {
        commitOffsetEnable = true;
    }
}
  • 内核中拉取消息(最重要的模块)

入口:org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl

public PullResult pullKernelImpl(
    final MessageQueue mq,
    final String subExpression,
    final String expressionType,
    final long subVersion,
    final long offset,
    final int maxNums,
    final int sysFlag,
    final long commitOffset,
    final long brokerSuspendMaxTimeMillis,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}

我们看到他有非常多的参数

拉取流程

  • 通过BrokerName找到对应的Broker
// step 1 通过BrokerName找到对应的Broker
FindBrokerResult findBrokerResult =
    this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
        this.recalculatePullFromWhichNode(mq), false);
  • 如果没有找到对应的,则更新路由信息
// step 2 如果没有找到对应的,则更新路由信息
if (null == findBrokerResult) {
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
    findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
            this.recalculatePullFromWhichNode(mq), false);
}
  • 检查Broker版本和Tag信息
// check version
if (!ExpressionType.isTagType(expressionType)
    &amp;&amp; findBrokerResult.getBrokerVersion() &lt; MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
  • 设置PullMessageRequestHeader
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
  • 调用pullMessage方法拉取消息,返回拉取结果
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
    brokerAddr,
    requestHeader,
    timeoutMillis,
    communicationMode,
    pullCallback);

因为 CommunicationMode 传递的是ASYNC,我们着重来看一下这个方法

入口: org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync

调用 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback()

这里我们就先不细看了

拉取消息处理

  • 如果PullCallback回调成功,则对结果进行处理
// 处理pullResult数据
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
    subscriptionData);

主要做了三件事,转换消息格式、设置消息信息、放入msgFoundList

将pullResult 转成 PullResultExt,转换消息格式为List

PullResultExt pullResultExt = (PullResultExt) pullResult;
// 转换消息格式为List
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

执行消息过滤,匹配符合的tag

if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
    msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
    for (MessageExt msg : msgList) {
        if (msg.getTags() != null) {
            if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                msgListFilterAgain.add(msg);
            }
        }
    }
}

设置消息的transactionId、扩展属性、BrokerName名称,放入List中

for (MessageExt msg : msgListFilterAgain) {
    String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (Boolean.parseBoolean(traFlag)) {
        msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    }
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
        Long.toString(pullResult.getMinOffset()));
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
        Long.toString(pullResult.getMaxOffset()));
    msg.setBrokerName(mq.getBrokerName());
}
pullResultExt.setMsgFoundList(msgListFilterAgain);

当pullStatus为FOUND,消息进行提交消费的请求

  • 获取第一条消息的offset(偏移量)
// 获取第一条消息的offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
  • 将读取消息List,更新到processQueue的TreeMap里面
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

主要做了两件事,循环读取消息list,存入msgTreeMap和计算此次读取信息偏移量

public boolean putMessage(final List<MessageExt> msgs) {
    boolean dispatchToConsume = false;
    try {
        // 上锁
        this.treeMapLock.writeLock().lockInterruptibly();
        try {
            int validMsgCnt = 0;
            // 循环读取消息list,存入msgTreeMap
            for (MessageExt msg : msgs) {
                MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                if (null == old) {
                    validMsgCnt++;
                    this.queueOffsetMax = msg.getQueueOffset();
                    msgSize.addAndGet(msg.getBody().length);
                }
            }
            msgCount.addAndGet(validMsgCnt);
            if (!msgTreeMap.isEmpty() && !this.consuming) {
                dispatchToConsume = true;
                this.consuming = true;
            }
            if (!msgs.isEmpty()) {
                // 获取最后一条消息
                MessageExt messageExt = msgs.get(msgs.size() - 1);
                // 获取最大偏移量
                String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                ...
            }
        } finally {
            this.treeMapLock.writeLock().unlock();
        }
    }
    ...
}
  • 提交消费请求,消息提交到内部的线程池
// 提交消费请求,消息提交到内部的线程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#submitConsumeRequest

获取 ConsumeRequest对象,拿到当前主题的监听器

这里拿到的监听器,就是我们在启动消费者的时候所注册的,监听到消息后执行相关的业务逻辑

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
               ...
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

在这里触发我们在一开始重写的consumeMessage方法,这里msgs用Collections.unmodifiableList进行包装,意思就是不可以修改的,是一个只读的List

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
  • ProcessQueue中移除已经处理的消息,同时更新Offset位置
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();
        if (consumeRequest.getMsgs().isEmpty())
            return;
        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            ...
        }
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            ...
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }
                // 如果存在失败消息,则过5秒在定时执行
                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
                ...
        }
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        // 更新Offset位置
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore()
            .updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }
  • 最后pullRequest放入pullRequestQueue中

入口:

org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

消息消费进度提交

  • 成功消费一条消息后,更新本地缓存表
  • 每5s向Broker提交消息消费进度
  • Broker每5s将进度持久化到consumerOffset.json

总结

目前只是将整体的一个消费端监听消息的流程了解清楚,里面还有许多细节需要去推敲~

以上就是详解RocketMQ 消费端如何监听消息的详细内容,更多关于RocketMQ 消费端监听消息的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMq 消息重试机制及死信队列详解

    目录 生产者消息重试 消费者消息重试 并发消费 顺序消费 并发消费和顺序消费区别 死信队列 实践出真知 公共部分创建 测试并发消费 并发消费状态 测试顺序消费 顺序消费状态 测试死信队列 死信队列特性 生产者消息重试 消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了. 有时因为网路等原因生产者也可能发送消息失败,也会进行消息重试,生产者消息重试比较简单,在springboot中只要在配置文件中配置一下就可以了. # 异步消息发送

  • RocketMQ消息发送流程源码剖析

    目录 正文 读源码 1 调用defaultMQProducerImpl.send() 2 设置过期时间 3 执行defaultMQProducerImpl.sendDefaultImpl()方法 sendDefaultImpl是发送消息的核心方法. 1 两个校验 2 获取topic路由信息 3 计算重试次数 4 执行队列选择方法 5 发送消息 正文 就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的

  • RocketMQ消息生产者是如何选择Broker示例详解

    目录 前言 从NameServer查询Topic信息 如何选择Broker 小结 前言 在RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢? 从NameServer查询Topic信息 通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sen

  • RocketMQ消息队列实现随机消息发送当做七夕礼物

    目录 正文 1 下载并启动RocketMQ 1.1 首先启动name server 1.2 然后启动Broker 2 生产者 3 消费者 正文 都在过情人节,前端的小哥哥们给女朋友画个页面,美美的,写个chrome插件,好看的,俺们后端程序员咋办. 我给媳妇写首诗,哈哈 我决定,把想对媳妇说的,今天发送到一个MQ里边,然后在七夕当天,打开消费者,将这一段话给俺媳妇看.你看,这就是我好久前对你说的话,这就是我们后端程序员的浪漫.当然也可以多发送几个,到时候跟根据topic控制到底发什么,哈哈. 这

  • RocketMQ生产者调用start发送消息原理示例

    目录 RocketMQ发送消息 start()里面究竟做了什么操作 小结 RocketMQ发送消息 我们在使用RocketMQ发送消息时,一般都会使用DefaultMQProducer,类型的代码如下: DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("42.192.50.8:9876"); try { producer.sta

  • RocketMQ生产消息与消费消息超详细讲解

    目录 1 RocketMQ简介 2 MQ的常见产品 3 环境搭建 4 单生产者单消费者模式 5 单生产者多消费者模式 5.1默认模式(负载均衡) 5.2广播模式 6 多生产者多消费者模式 1 RocketMQ简介 RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目.并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的

  • RocketMQ延迟消息简明介绍

    目录 前言 核心属性 RMQ_SYS_SCHEDULE_TOPIC FIRST_DELAY_TIME DELAY_FOR_A_WHILE DELAY_FOR_A_PERIOD delayLevelTable offsetTable 核心方法 queueId2DelayLevel delayLevel2QueueId updateOffset computeDeliverTimestamp start() shutdown() load() parseDelayLevel 前言 场景可以是这样的,

  • RocketMQ普通消息实战演练详解

    目录 引言 普通消息同步发送 普通消息异步发送 普通消息单向发送 集群消费模式 广播消费模式 引言 之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV. 相关的配置,安装和启动在这篇文章有相关讲解  https://www.jb51.net/article/260237.htm 普通消息同步发送 同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息.这个方式可以确保消息发送到Broker成功,一些重要的

  • 详解RocketMQ 消费端如何监听消息

    目录 前言 流程地图 源码跟踪 核心模块(消息拉取) 拉取流程 拉取消息处理 当pullStatus为FOUND,消息进行提交消费的请求 消息消费进度提交 总结 前言 上一篇文章中我们主要来看RocketMQ消息消费者是如何启动的, 那他有一个步骤是非常重要的,就是启动消息的监听,通过不断的拉取消息,来实现消息的监听,那具体怎么做,让我们我们跟着源码来学习一下~ 流程地图 源码跟踪 这一块的代码比较多,我自己对关键点的一些整理,这个图我画的不是很OK 核心模块(消息拉取) 入口:this.pul

  • 详解Apache配置多个监听端口和不同的网站目录

    详解Apache配置多个监听端口和不同的网站目录 一 :添加多端口 Listen 80 Listen 81 Listen 82 二:设置虚拟主机目录 NameVirtualHost *:80 <VirtualHost *:80> ServerName localhost DocumentRoot "D:/phpStudy/WWW/" </VirtualHost> NameVirtualHost *:81 <VirtualHost *:81> Serv

  • 详解vue中v-on事件监听指令的基本用法

    一.本节说明 我们在开发过程中经常需要监听用户的输入,比如:用户的点击事件.拖拽事件.键盘事件等等.这就需要用到我们下面要学习的内容v-on指令. 我们通过一个简单的计数器的例子,来讲解v-on指令的使用. 二. 怎么做 定义数据counter,用于表示计数器数字,初始值设置为0 v-on:click 表示当发生点击事件的时候,触发等号里面的表达式或者函数 表达式counter++和counter--分别实现计数器数值的加1和减1操作 语法糖:我们可以将v-on:click简写为@click 三

  • 详解Spring事件发布与监听机制

    目录 一.ApplicationContext 事件机制 二.ApplicationListener 监听器 三.ApplicationEvent 事件 四.自定义事件和监听器 五.注解式监听器 一.ApplicationContext 事件机制 ApplicationContext 事件机制采用观察者设计模式来实现,通过 ApplicationEvent 事件类和 ApplicationListener 监听器接口,可以实现 ApplicationContext 事件发布与处理. 每当 App

  • 详解SpringCloud eureka服务状态监听

    一.前言 近期由于公司不同平台项目之间的业务整合,需要做到相互访问! 每个平台均有自己的注册中心和服务,且注册中心相互之间并没有相互注册! 借助spring的事件监听,在eureka-server端监听服务注册,将所有服务的ip和port存放至redis库,然后让其他平台服务通过redis库获取ip和端口号,进而进行http调用.结构图如下: 二.事件解析 事件列表 在org.springframework.cloud.netflix.eureka.server.event包下会发现如下类: E

  • 详解Android截屏事件监听

    1. 前言 Android系统没有直接对截屏事件监听的接口,也没有广播,只能自己动手来丰衣足食,一般有三种方法. 利用FileObserver监听某个目录中资源变化情况 利用ContentObserver监听全部资源的变化 监听截屏快捷按键 由于厂商自定义Android系统的多样性,再加上快捷键的不同以及第三方应用,监听截屏快捷键这事基本不靠谱,可以直接忽略. 本文使用的测试手机,一加2(One Plus 2). 2. FileObserver 添加权限: <uses-permission an

  • 详解vuex结合localstorage动态监听storage的变化

    需求:不同组件间共用同一数据,当一个组件将数据发生变化时,其他组件也可以响应该变化. 分析:vue无法监听localstorage的变化.localstorage主要用于不同页面间传值,vue适合组件间传值.对于组件间共用同一数据又想保存住信息或者再页面刷新的时候不丢失数据(vuex在页面刷新的时候存储的值会丢失,localstorage存储在本地浏览器中),可以采用vuex+localstorage的方式. 关于vuex和storage的区别 1.最重要的区别:vuex存储在内存,locals

  • 详解SpringBoot 发布ApplicationEventPublisher和监听ApplicationEvent事件

    资料地址 Spring @Aync 实现方法 自定义需要发布的事件类,需要继承ApplicationEvent类或PayloadApplicationEvent<T>(该类也仅仅是对ApplicationEvent的一层封装) 使用@EventListener来监听事件 使用ApplicationEventPublisher来发布自定义事件(@Autowired注入即可) /** * 自定义保存事件 * @author peter * 2019/1/27 14:59 */ public cla

  • 详解RocketMQ中的消费者启动与消费流程分析

    目录 一.简介 1.1 RocketMQ 简介 1.2 工作流程 二.消费者启动流程 2.1 实例化消费者 2.2 设置NameServer和订阅topic过程 2.2.1 添加tag 2.2.2 发送心跳至Broker 2.2.3上传过滤器类至FilterServer 2.3 注册回调实现类 2.4 消费者启动 三.pull/push 模式消费 3.1 pull模式-DefaultMQPullConsumer 3.2 push模式-DefaultMQPushConsumer 3.3 小结 四.

  • Springboot详解RocketMQ实现广播消息流程

    RocketMQ消息模式主要有两种:广播模式.集群模式(负载均衡模式) 广播模式是每个消费者,都会消费消息: 负载均衡模式是每一个消费只会被某一个消费者消费一次: 我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示: 我们可以通过@RocketMQMessageListener的messageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡

随机推荐