RocketMQ producer同步发送单向发送源码解析

目录
  • RocketMQ生产者发送消息分为三种模式
  • 1. 同步发送
    • 1.1 DefaultMQProducerImpl#sendDefaultImpl
    • 1.2 DefaultMQProducerImpl#sendKernelImpl
    • 1.3 MQClientAPIImpl#sendMessage
    • 1.4 MQClientAPIImpl#sendMessageSync
    • 1.5 NettyRemotingClient#invokeSync
  • 2. 单向发送
    • 2.1 DefaultMQProducerImpl#sendOneway

RocketMQ生产者发送消息分为三种模式

RocketMQ生产者发送消息分为三种模式,分别是同步发送,异步发送和单向发送。

  • 单向发送,这个就是发送之后不用接收结果的,就是你发出去一个消息,然后就返回了,就算有结果返回也不会接收了,这是站在消息生产者的角度;
  • 同步发送的话,就是发出去一个消息,这个线程要等着它返回消息发送结果,然后你这个线程再根据这个消息发送结果再做一些业务操作等等;
  • 异步发送,这个就是在你发送消息之前要给一个callback,发送的时候,你这个线程就不用等着,该干什么就干什么,然后发送结果回来的时候,是由其他线程调用你这个callback来处理的,你可以把这个callback看作是一个回调函数,回调方法,这个方法里面的业务逻辑就是你对这个消息发送结果的处理。注意,本文介绍的消息发送只是普通的消息发送,那种事务类型的消息,我们以后会有介绍。

1. 同步发送

producer同步发送消息的示例在org.apache.rocketmq.example.simple.Producer类中,代码如下:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 1. 创建 DefaultMQProducer 对象
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Launch the instance.
         */
        // todo 2. 启动 producer
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 3. 发送消息
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            ...
        }
        producer.shutdown();
    }
}

我们可以看到这个代码,你是同步消息你是需要在你自己的业务线程里面接收这个sendResult的,然后在做一些业务处理,比如我这里就是打印了一下这个sendResult。

接下来我们看下它是怎样发送的,这里是调用了这个producer的send方法。

@Override
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // topic 和消息长度 校验
    Validators.checkMessage(msg, this);
    msg.setTopic(withNamespace(msg.getTopic()));
    // todo
    return this.defaultMQProducerImpl.send(msg);
}

我们可以看到,这个 DefaultMQProducer 将这个消息给了defaultMQProducerImpl 这个实现的send方法来处理了。

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // todo 默认超时时间3s
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

defaultMQProducerImpl的send方法,加了个超时时间 ,然后有调用它的重载方法send(msg,timeout)

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // todo 同步模式
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

这个send(msg,timeout)又调用了sendDefaultImpl 方法,然后他这里加了个通信模式是同步,CommunicationMode.SYNC。

1.1 DefaultMQProducerImpl#sendDefaultImpl

sendDefaultImpl 方法就比较长了了我们分成几部分来介绍:

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 判断状态是否是running
    this.makeSureStateOK();
    // 检查消息合法性
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 随机的invokeID
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // todo 获取topic信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    ...
}

这一小段代码其实就是做了一些准备检查工作,注意第二行的个检查消息合法性,它要检查你topic,消息长度的,你不能发空消息,消息长度也不能太长,默认是不超过4m,接下来这些就是记录一下时间了,再看最后一行,就是根据你这个消息发送的topic,然后获取topic 发送消息的这么一个信息,这里面就有这topic 有几个MessageQueue,然后每个MessageQueue对应在哪个broker上面,broker 的地址又是啥的,它这个方法会先从本地的一个缓存中获取下,没有的话就从nameserv更新下这个本地缓存,再找找,要是再找不到,它就认为你没有这个topic了,然后就去nameserv上面拉取一个默认topic的一些配置信息给你用(这个其实就是在新建一个topic)。 接着这个方法往下看,接着就是判断 这个TopicPublishInfo 是否存在了,如果不存在的话就抛出异常了,没有后续了就,如果存在的话:

...
if (topicPublishInfo != null && topicPublishInfo.ok()) {
    boolean callTimeout = false;
    MessageQueue mq = null;
    Exception exception = null;
    SendResult sendResult = null;
    // 重试次数 区分同步、其他
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    // 存放发送过的broker name
    String[] brokersSent = new String[timesTotal];
    // 重试发送
    for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        // todo 选择message queue
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
        if (mqSelected != null) {
            mq = mqSelected;
            brokersSent[times] = mq.getBrokerName();
            try {
                beginTimestampPrev = System.currentTimeMillis();
                if (times > 0) {
                    //Reset topic with namespace during resend.
                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                }
                long costTime = beginTimestampPrev - beginTimestampFirst;
                if (timeout < costTime) {
                    callTimeout = true;
                    break;
                }
                // todo 进行发送
                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                endTimestamp = System.currentTimeMillis();
                // todo isolation 参数为false(看一下异常情况)
                this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                switch (communicationMode) {
                    case ASYNC:
                        return null;
                    case ONEWAY:
                        return null;
                    case SYNC:
                        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                            if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                continue;
                            }
                        }
                        return sendResult;
                    default:
                        break;
                }
            } catch (RemotingException e) {
                endTimestamp = System.currentTimeMillis();
                this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                log.warn(msg.toString());
            ...

其实下面还有许多处理异常的操作没有放上,不过不影响我们的主流程,先是判断你这个通信模式,如果是同步的话,默认重试次数就是2 ,然后加上本身这次请求,也就是最查请求3次。这个for循环就是失败重试的代码,再看下代码selectOneMessageQueue这个就是选择一个MesssageQueue的方法了,这个是比较重要的,这里我们先不说,你可以把它理解为 我们的负载均衡。接着往下走,就是判断一下时间了,计算一下剩下的时间, 如果这一堆前面的内容耗时很长,然后已经超了之前设置的默认超时时间,这个时候就会超时了,然后将这个calltimeout设置成true了。

1.2 DefaultMQProducerImpl#sendKernelImpl

接着就是进行发送了调用sendKernelImpl 方法:

private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    // 根据MessageQueue获取Broker的网络地址
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
    SendMessageContext context = null;
    ...

这个sendKernelImpl 也是有点长,然后我们一部分一部分的看下,这就是根据MessageQueue里面的broker name 获取一下broker addr,他这个broker addr 选的是master的,比如说我们 broker使用的是 master/slave 高可用架构,这个时候只会选择那个master,毕竟是往里面写消息,然后只能用master,等到介绍消息消费者的时候,消息消费者是可以向slave node 获取消息消费的,前提是 master 负载比较大,然后消息消费者下次获取消费的消息已经在slave里面了,然后消息消费者获取到消息之后,它里面有个字段是告诉你下次可以去xxx 地址的broker 拉取消息,这个我们介绍到消息消费者的时候再说。

接着回来,如果没有获取到这个broker 地址的话,就是去nameserv上更新下本地缓存,然后再获取下。接着再往下就是再次判断一下这个broker addr 了,如果还没有就抛出异常,如果有的话 就执行下面的代码了:

...
SendMessageContext context = null;
if (brokerAddr != null) {
    brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
    byte[] prevBody = msg.getBody();
    try {
        //for MessageBatch,ID has been set in the generating process
        // 给消息设置全局唯一id, 对于MessageBatch在生成过程中已设置了id
        if (!(msg instanceof MessageBatch)) {
            MessageClientIDSetter.setUniqID(msg);
        }
        boolean topicWithNamespace = false;
        if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
            msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
            topicWithNamespace = true;
        }
        int sysFlag = 0;
        // 消息体是否压缩
        boolean msgBodyCompressed = false;
        // 压缩消息 内容部分超了4k就会压缩
        if (this.tryToCompressMessage(msg)) {
            sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
            msgBodyCompressed = true;
        }
        final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
            sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
        }
        // 判断有没有hook
        if (hasCheckForbiddenHook()) {
            CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
            checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
            checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
            checkForbiddenContext.setCommunicationMode(communicationMode);
            checkForbiddenContext.setBrokerAddr(brokerAddr);
            checkForbiddenContext.setMessage(msg);
            checkForbiddenContext.setMq(mq);
            checkForbiddenContext.setUnitMode(this.isUnitMode());
            // 执行Forbidden 钩子
            this.executeCheckForbiddenHook(checkForbiddenContext);
        }
        ...

第一句,这个其实就是进行一个vip通道地址的转换,这个比较有意思,如果你这个支持vip channel的话,它会把broker addr 里面的端口改变一下,这个所谓的vip channel ,其实就是与它的另一个端口建立连接,这个端口就是当前端口-2 ;

接着,如果这个消息不是批量消息的话,我们就给这个消息设置一个唯一的消息id,再往下就是 sysflag的处理了,这个sysflag里面记录了好几个属性值,使用二进制来处理的,比如说消息是否压缩了(这个压缩,就是你消息内容超过了默认的4k之后,就会进行压缩,这个压缩的阈值你是可以配置的),是否是个事务消息等等。 接下来就是执行hook了,这个hook就是forbidenHook ,其实就是对消息进行过滤。

...
if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }
    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
}
// 封装消息头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 设置group
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// topic
requestHeader.setTopic(msg.getTopic());
// 设置默认topic
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
// 设置默认topic的队列数量 默认4个
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
// 队列id
requestHeader.setQueueId(mq.getQueueId());
// 消息系统标记
requestHeader.setSysFlag(sysFlag);
// 消息发送时间
requestHeader.setBornTimestamp(System.currentTimeMillis());
// 消息标记(RocketMQ对消息标记不做任何处理,供应用程序使用)
requestHeader.setFlag(msg.getFlag());
// 设置扩展属性
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
// 是否批量
requestHeader.setBatch(msg instanceof MessageBatch);
// 判断消息是否是 %RETRY% 开头
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }
    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}
...

在往下就是执行一下发送消息之前的hook,再往下就是封装发送消息请求头,然后这个请求头里面就涵盖了很多的参数,比如说topic,MessageQueue 队列Id, 出生日期,flag等等。再往下就是消息发送了

...
SendResult sendResult = null;
// 同步 异步  单向
switch (communicationMode) {
    // 异步
    case ASYNC:
        Message tmpMessage = msg;
        boolean messageCloned = false;
        if (msgBodyCompressed) {
            //If msg body was compressed, msgbody should be reset using prevBody.
            //Clone new message using commpressed message body and recover origin massage.
            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
            tmpMessage = MessageAccessor.cloneMessage(msg);
            messageCloned = true;
            msg.setBody(prevBody);
        }
        if (topicWithNamespace) {
            if (!messageCloned) {
                tmpMessage = MessageAccessor.cloneMessage(msg);
                messageCloned = true;
            }
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
        // 判断超时时间
        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        // todo
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            tmpMessage,
            requestHeader,
            timeout - costTimeAsync,
            communicationMode,
            sendCallback,
            topicPublishInfo,
            this.mQClientFactory,
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
            context,
            this);
        break;
        // 单向
    case ONEWAY:
        // 同步
    case SYNC:
        long costTimeSync = System.currentTimeMillis() - beginStartTime;
        // 判判是否超时
        if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        // todo 交给 mq api去发送消息
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}
// 是否注册了消息发送钩子函数
if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}
...

因为本小节主要是介绍下这个同步发送消息,然后我们就主要介绍下这个sync的代码逻辑: 首先是判断超时,然后交给 MQClientAPI层去处理,然后返回sendResult。

1.3 MQClientAPIImpl#sendMessage

我们这里接着看下MQClientAPIImpl里面的sendMessage 实现:

public SendResult sendMessage(
    final String addr,
    final String brokerName,
    final Message msg,
    final SendMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    RemotingCommand request = null;
    String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
    boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
    if (isReply) {
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
        }
    } else {
        // sendSmartMsg默认开启,也算一种优化吧 批量消息
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            // 普通消息
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
    }
    // 设置消息体
    request.setBody(msg.getBody());
    switch (communicationMode) {
        case ONEWAY:
            // todo
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
            return null;
        case ASYNC:
            final AtomicInteger times = new AtomicInteger();
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
            // 判断超时时间
            if (timeoutMillis < costTimeAsync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // todo
            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                retryTimesWhenSendFailed, times, context, producer);
            return null;
        case SYNC:
            long costTimeSync = System.currentTimeMillis() - beginStartTime;
            // 判断超时时间
            if (timeoutMillis < costTimeSync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // todo 同步发送
            return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
        default:
            assert false;
            break;
    }
    return null;
}

这里先生成一个RemotingCommand 这么个实体对象,然后RequestCode就是SEND_MESSAGE,其实这里判断了一下sendSmartMsg 这个参数,把requestHeader优化了一下,然后换成了requestHeaderV2,其实这个requestHeaderV2 内容跟requestHeader一样,但是变量名是单个字母的,然后序列化,反序列化,传输内容都有所优化,其实他这个序列化使用是json形式的,然后想想就知道有些哪些好处了, 唯一的缺点就是可读性差点,但是这个玩意是对用户透明的,用户不需要关心。

接着就是判断通信类型,然后发送消息了,这里是同步发送,先是判断一下超时时间,接着就是调用sendMessageSync 进行同步发送了,我们接着来看下这个sendMessageSync 方法实现。

1.4 MQClientAPIImpl#sendMessageSync

private SendResult sendMessageSync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
    // todo 同步调用
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    // 处理响应
    return this.processSendResponse(brokerName, msg, response,addr);
}

这里就调用到了client 模块(这个client其实就是直接操作netty了)来处理了,然后返回响应,调用processSendResponse 方法来处理响应。

1.5 NettyRemotingClient#invokeSync

我们再来看下client的 invokeSync 方法:

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    // 开始时间
    long beginStartTime = System.currentTimeMillis();
    // todo 轮询获取namesrv地址Channel
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            // 执行开始之前的rpchook
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            // 判断超时 之前有获取链接的操作,可能会出现超时的情况
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            // todo 进行同步执行,获取响应
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            // 执行之后的rpchook
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
            // 远程发送请求异常
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            // 关闭channel
            this.closeChannel(addr, channel);
            throw e;
            // 超时异常
        } catch (RemotingTimeoutException e) {
            // 如果超时 就关闭cahnnel话,就关闭channel 默认是不关闭的
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

这里有两个点需要关注下,首先是根据broker addr 这个地址获取一下对应的channel ,如果不存在的话就创建一下这个连接, 稍微看下这块的代码:

private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
    // 如果地址不存在,就返回namesrv 的channel
    if (null == addr) {
        return getAndCreateNameserverChannel();
    }
    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null && cw.isOK()) {
        return cw.getChannel();
    }
    // 创建channel
    return this.createChannel(addr);
}

如果你这个addr是空的话,这个就是默认找nameserv的addr ,然后找对应channel就可以了,如果不是null ,然后它会去这个channelTable 这个map中去找,如果没有的话就创建一个对应的channel

接着回到这个invokeSync 方法中,获得channel之后,就是执行一下rpcHook了,这东西就是你在创建MQProducer的时候设置的,在调用前执行一次,调用后执行一次,其实你就可以通过这个hook来实现很多功能,监控的功能比较多些。接着就是调用了invokeSyncImpl 这个实现方法来发送消息了,这个方法是它的一个父类里面的:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    // 获取 请求id
    final int opaque = request.getOpaque();
    try {
        // 创建ResponseFuture
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        // 放入responseTable 表中
        this.responseTable.put(opaque, responseFuture);
        // 获取远程地址
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                // 成功
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                // 失败
                } else {
                    responseFuture.setSendRequestOK(false);
                }
                // 移除response中的缓存
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            // 成功了还是null  还是超时
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                // 没发出去,就排除异常
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }
        // 返回响应结果
        return responseCommand;
    } finally {
        // 移除
        this.responseTable.remove(opaque);
    }
}

这个方法其实就是最终往 channel里面写内容的方法了,我们来看下,先是为这次request创建一个id 吧,这个id主要用来返回响应的时候用的。

接着创建一个ResposeFuture ,这个东西异步,同步都可以用,这个一会介绍一下它的原理,接着就是将这个id 与这个 ResposeFuture 关联起来放到这个 responseTable 里面的, 接着就是往channel里面发送消息了,这里它添加一个listener ,这listener的执行时机就是发送出去的时候,最后就是等待这个响应了。

我们来解释下这个ResposeFuture 原理, 当执行了responseFuture.waitResponse(timeoutMillis); 这行代码,当前线程就会wait ,然后被阻塞,然后等着响应回来的时候,netty处理响应的线程会从响应里面获取一下这个opaque这个id,就是请求之前在request生成的,broker 在响应的时候会会把这个id 放回到response 中, 然后会根据这个opaque 从responseTable中找到这个 ResposeFuture ,然后把响应设置到这个里面,最后唤醒一下wait在这个对象里面的线程就可以了,这样你这个业务线程就得到了这个RemotingResponse 了。 好了,到这我们就解释清楚了,然后我们看下他这个代码是怎样实现的:

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    final int opaque = cmd.getOpaque();
    // 获取对应id 的responseFuture
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        // 设置
        responseFuture.setResponseCommand(cmd);
        // 从响应表中移除
        responseTable.remove(opaque);
        if (responseFuture.getInvokeCallback() != null) {
            // todo 执行回调
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}

不过它这个ResposeFuture 是使用CountDownLatch 来实现这个wait与唤醒的。我们来具体看下这个 waitResponse方法与这个putResponse方法:

public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
    this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
    return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
    this.responseCommand = responseCommand;
    this.countDownLatch.countDown();
}

2. 单向发送

单向发送其实这块跟同步发送的流程差不多,我们来看下它的生产者代码是怎样写的: org.apache.rocketmq.example.openmessaging.SimpleProducer:

public static void main(String[] args) {
    final MessagingAccessPoint messagingAccessPoint =
        OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
    final Producer producer = messagingAccessPoint.createProducer();
    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");
    producer.startup();
    System.out.printf("Producer startup OK%n");
    ...
    {
        producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
        System.out.printf("Send oneway message OK%n");
    }
   ...
}

可以看到我们最后发送的时候调用的是sendOneway方法,这个方法是没有返回值的。

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    this.defaultMQProducerImpl.sendOneway(msg);
}

2.1 DefaultMQProducerImpl#sendOneway

这里就是调用了defaultMQProducerImpl的 sendOneway方法

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    this.defaultMQProducerImpl.sendOneway(msg);
}

这里需要注意的是它也是调用了sendDefaultImpl 方法,然后通信方式是oneway 。这里我们就不细说了,可以看下同步方法解析这个方法的说明,这里唯一要提一点是单向发送是没有这个重试的,然后就发送一次。下面的流程都是一样的,然后就到了这个MQClientAPIImpl 的 sendMessage 方法

...
switch (communicationMode) {
    case ONEWAY:
        // todo
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
        return null;
    ...

然后他这个是又调用了NettyRemotingClient 的 invokeOneway 方法:

public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
    RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null &amp;&amp; channel.isActive()) {
        try {
            doBeforeRpcHooks(addr, request);
            this.invokeOnewayImpl(channel, request, timeoutMillis);
        } catch (RemotingSendRequestException e) {
            log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

这里也是根据broker addr 获取channel, 如果没有的话,也是创建一个,接着就是执行这个rpc调用前的hook ,注意这里没有调用后的一个hook,因为我们并不知道它是什么情况。 接着又调用了invokeOnewayImpl 方法:

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    // 请求体, 标记是一个单向调用
    request.markOnewayRPC();
    // 获取凭证
    boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        try {
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    // 释放信号量
                    once.release();
                    if (!f.isSuccess()) {
                        log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                    }
                }
            });
        } catch (Exception e) {
            // 释放信号量
            once.release();
            log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
        } else {
            String info = String.format(
                "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                timeoutMillis,
                this.semaphoreOneway.getQueueLength(),
                this.semaphoreOneway.availablePermits()
            );
            log.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}

这里使用了semaphore进行限流,然后默认的话是同时支持65535 个请求发送的,这个semaphore 限流只有单向发送与这个异步发送会有,接着就会将这个request写入channel中,然后add了一个listener ,这个listener执行时机就是消息发送出去了,这个时候就会释放 信号量。

到这我们这个单向发送就解析完成了。

参考文章

RocketMQ4.8注释github地址

RocketMQ源码分析专栏

以上就是RocketMQ producer同步发送单向发送源码解析的详细内容,更多关于RocketMQ producer的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ producer容错机制源码解析

    目录 1. 前言 2. 失败重试 3. 延迟故障 3.1 最普通的选择策略 3.2 延迟故障的实现 1. 前言 本文主要是介绍一下RocketMQ消息生产者在发送消息的时候发送失败的问题处理?这里有两个点,一个是关于消息的处理,一个是关于broker的处理,比如说发送消息到broker-a的broker失败了,我们可能下次就不想发送到这个broker-a,这就涉及到一个选择broker的问题,也就是选择MessageQueue的问题. 2. 失败重试 其实失败重试我们在介绍RocketMQ消息生

  • RocketMQ broker文件清理源码解析

    目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2. 源码解析 2.1 清理commitlog 2.2 ConsumeQueue 清理 2.3 indexFile 清理 3. 总结 1. broker 清理文件介绍 本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈 1.1 哪些文件需要清理 首先我们需要介绍下在RocketMQ中哪些文件需要清理,其实可以想一想,在RocketMQ

  • RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

    目录 前言 ConsumeQueue详解 IndexFile详解 IndexHeader slots槽位 indexes索引数据 实时更新ConsumeQueue与IndexFile源码分析 CommitLogDispatcherBuildConsumeQueue源码分析 CommitLogDispatcherBuildIndex源码分析 IndexFile如何解决Hash冲突 总结 前言 前面我们介绍了消息是如何存储的,消息是如何刷盘的,讲的都是CommitLog是如何存储和刷盘的.虽然Com

  • RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

    目录 RocketMq消息处理 1. 处理PULL_MESSAGE请求 2. 获取消息 3. 挂起请求:PullRequestHoldService#suspendPullRequest 3.1 处理挂起请求的线程:PullRequestHoldService 3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup 3.3 消息分发中唤醒consumer请求 总结 RocketMq消息处理 RocketMq消息处理整个流程如下: 本系列Roc

  • RocketMQ源码解析topic创建机制详解

    目录 1. RocketMQ Topic创建机制 2. 自动Topic 3. 手动创建--预先创建 通过界面控制台创建 1. RocketMQ Topic创建机制 以下源码基于Rocket MQ 4.7.0 RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建.可以通过设置broker的配置文件来禁用或者允许自动创建.默认是开启的允许自动创建 autoCreateTopicEnable=true/false 下面会结合源码来深度分析一下自动创建和手动创建的过程. 2. 自动T

  • Java同步锁Synchronized底层源码和原理剖析(推荐)

    目录 1 synchronized场景回顾 2 反汇编寻找锁实现原理 3 synchronized虚拟机源码 3.1 HotSpot源码Monitor生成 3.2 HotSpot源码之Monitor竞争 3.3 HotSpot源码之Monitor等待 3.4 HotSpot源码之Monitor释放 1 synchronized场景回顾 目标:synchronized回顾(锁分类–>多线程)概念synchronized:是Java中的关键字,是一种同步锁.Java中锁分为以下几种:乐观锁.悲观锁(

  • ReentrantLock从源码解析Java多线程同步学习

    目录 前言 管程 管程模型 MESA模型 主要特点 AQS 共享变量 资源访问方式 主要方法 队列 node节点等待状态 ReentrantLock源码分析 实例化ReentrantLock 加锁 A线程加锁成功 B线程尝试加锁 释放锁 总结 前言 如今多线程编程已成为了现代软件开发中的重要部分,而并发编程中的线程同步问题更是一道难以逾越的坎.在Java语言中,synchronized是最基本的同步机制,但它也存在着许多问题,比如可重入性不足.死锁等等.为了解决这些问题,Java提供了更加高级的

  • Netty实战源码解析NIO编程

    目录 1 前言 2 Netty是什么? 3 Java I/O模型简介 3.1 BIO代码实现 4 Java NIO 4.1 基本介绍 4.2 三大核心组件的关系 4.3 Buffer缓冲区 4.4 Channel通道 4.5 Selector选择器 4.5.1 Selector的创建 4.5.2 注册Channel到Selector 4.5.3 SelectionKey 4.5.4 从Selector中选择Channel 4.5.5 停止选择的方法 4.5.6 NIO客户端.服务端 5 Java

  • Android图片加载利器之Picasso源码解析

    看到了这里,相信大家对Picasso的使用已经比较熟悉了,本篇博客中将从基本的用法着手,逐步的深入了解其设计原理. Picasso的代码量在众多的开源框架中算得上非常少的一个了,一共只有35个class文件,但是麻雀虽小,五脏俱全.好了下面跟随我的脚步,出发了. 基本用法 Picasso.with(this).load(imageUrl).into(imageView); with(this)方法 public static Picasso with(Context context) { if

  • Android okhttp的启动流程及源码解析

    前言 这篇文章主要讲解了okhttp的主要工作流程以及源码的解析. 什么是OKhttp 简单来说 OkHttp 就是一个客户端用来发送 HTTP 消息并对服务器的响应做出处理的应用层框架. 那么它有什么优点呢? 易使用.易扩展. 支持 HTTP/2 协议,允许对同一主机的所有请求共用同一个 socket 连接. 如果 HTTP/2 不可用, 使用连接池复用减少请求延迟. 支持 GZIP,减小了下载大小. 支持缓存处理,可以避免重复请求. 如果你的服务有多个 IP 地址,当第一次连接失败,OkHt

  • JetCache 缓存框架的使用及源码解析(推荐)

    目录 一.简介 为什么使用缓存? 使用场景 使用规范 二.如何使用 引入maven依赖 添加配置 配置说明 注解说明 @EnableCreateCacheAnnotation @EnableMethodCache @CacheInvalidate @CacheUpdate @CacheRefresh @CachePenetrationProtect @CreateCache 三.源码解析 项目的各个子模块 常用注解与变量 缓存API Cache接口 AbstractCache抽象类 Abstra

  • Project Reactor源码解析publishOn使用示例

    目录 功能分析 代码示例 prefetch delayError 源码分析 Flux#publishOn() Flux#subscribe() FluxPublishOn#subscribeOrReturn() FluxPublishOn#onSubscribe() 非融合 FluxPublishOn#onNext() FluxPublishOn#trySchedule() FluxPublishOn#run() FluxPublishOn#runAsync() FluxPublishOn#ch

  • Mango Cache缓存管理库TinyLFU源码解析

    目录 介绍 整体架构 初始化流程 读流程 写流程 事件处理机制 主流程 write 清理工作 缓存管理 什么是LRU? 什么是SLRU? 什么是TinyLFU? mango Cache中的TinyLFU counter counter的初始化 counter的使用 lruCache slruCache filter TinyLFU的初始化 TinyLFU写入 TinyLFU访问 增加entry的访问次数 估计entry访问次数 总结 介绍 据官方所述,mango Cache是对Guava Cac

  • Flink JobGraph生成源码解析

    目录 引言 概念 JobGraph生成 生成hash值 生成chain 总结 引言 在DataStream基础中,由于其中的内容较多,只是介绍了JobGraph的结果,而没有涉及到StreamGraph到JobGraph的转换过程.本篇我们来介绍下JobGraph的生成的详情,重点是Operator可以串联成Chain的条件 概念 首先我们来回顾下JobGraph中的相关概念 JobVertex:job的顶点,即对应的计算逻辑(这里用的是Vertex, 而前面用的是Node,有点差异),通过in

随机推荐