RocketMQ获取指定消息的实现方法(源码)

概要

消息查询是什么?

消息查询就是根据用户提供的msgId从MQ中取出该消息

RocketMQ如果有多个节点如何查询?

问题:RocketMQ分布式结构中,数据分散在各个节点,即便是同一Topic的数据,也未必都在一个broker上。客户端怎么知道数据该去哪个节点上查?

猜想1:逐个访问broker节点查询数据

猜想2:有某种数据中心存在,该中心知道所有消息存储的位置,只要向该中心查询即可得到消息具体位置,进而取得消息内容

实际:

1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及该消息在CommitLog中的偏移量。

2.客户端实现会从msgId字符串中解析出broker地址,向指定broker节查询消息。

问题:CommitLog文件有多个,只有偏移量估计不能确定在哪个文件吧?

实际:单个Broker节点内offset是全局唯一的,不是每个CommitLog文件的偏移量都是从0开始的。单个节点内所有CommitLog文件共用一套偏移量,每个文件的文件名为其第一个消息的偏移量。所以可以根据偏移量和文件名确定CommitLog文件。

源码阅读

0.使用方式

MessageExt  msg = consumer.viewMessage(msgId);

1.消息ID解析

这个了解下就可以了

public class MessageId {
 private SocketAddress address;
 private long offset;

 public MessageId(SocketAddress address, long offset) {
  this.address = address;
  this.offset = offset;
 }

 //get-set
}

//from MQAdminImpl.java
public MessageExt viewMessage(
 String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

 MessageId messageId = null;
 try {
  //从msgId字符串中解析出address和offset
  //address = ip:port
  //offset为消息在CommitLog文件中的偏移量
  messageId = MessageDecoder.decodeMessageId(msgId);
 } catch (Exception e) {
  throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
 }
 return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
  messageId.getOffset(), timeoutMillis);
}

//from MessageDecoder.java
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
 SocketAddress address;
 long offset;
 //ipv4和ipv6的区别
 //如果msgId总长度超过32字符,则为ipv6
 int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;

 byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
 byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
 ByteBuffer bb = ByteBuffer.wrap(port);
 int portInt = bb.getInt(0);
 address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);

 // offset
 byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
 bb = ByteBuffer.wrap(data);
 offset = bb.getLong(0);

 return new MessageId(address, offset);
}

2.长连接客户端RPC实现

要发请求首先得先建立连接,这里方法可以看到创建连接相关的操作。值得注意的是,第一次访问的时候可能连接还没建立,建立连接需要消耗一段时间。代码中对这个时间也做了判断,如果连接建立完成后,发现已经超时,则不再发出请求。目的应该是尽可能减少请求线程的阻塞时间。

//from NettyRemotingClient.java
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
 throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
 long beginStartTime = System.currentTimeMillis();
 //这里会先检查有无该地址的通道,有则返回,无则创建
 final Channel channel = this.getAndCreateChannel(addr);
 if (channel != null && channel.isActive()) {
  try {
   //前置钩子
   doBeforeRpcHooks(addr, request);
   //判断通道建立完成时是否已到达超时时间,如果超时直接抛出异常。不发请求
   long costTime = System.currentTimeMillis() - beginStartTime;
   if (timeoutMillis < costTime) {
    throw new RemotingTimeoutException("invokeSync call timeout");
   }
   //同步调用
   RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
   //后置钩子
   doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); //后置钩子
   return response;
  } catch (RemotingSendRequestException e) {
   log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
   this.closeChannel(addr, channel);
   throw e;
  } catch (RemotingTimeoutException e) {
   if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
    this.closeChannel(addr, channel);
    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
   }
   log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
   throw e;
  }
 } else {
  this.closeChannel(addr, channel);
  throw new RemotingConnectException(addr);
 }
}

下一步看看它的同步调用做了什么处理。注意到它会构建一个Future对象加入待响应池,发出请求报文后就挂起线程,然后等待唤醒(waitResponse内部使用CountDownLatch等待)。

//from NettyRemotingAbstract.javapublic RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
 final long timeoutMillis)
 throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
 //请求id
 final int opaque = request.getOpaque();

 try {
  //请求存根
  final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
  //加入待响应的请求池
  this.responseTable.put(opaque, responseFuture);
  final SocketAddress addr = channel.remoteAddress();
  //将请求发出,成功发出时更新状态
  channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
   @Override
   public void operationComplete(ChannelFuture f) throws Exception {
    if (f.isSuccess()) { //若成功发出,更新请求状态为“已发出”
     responseFuture.setSendRequestOK(true);
     return;
    } else {
     responseFuture.setSendRequestOK(false);
    }

    //若发出失败,则从池中移除(没用了,释放资源)
    responseTable.remove(opaque);
    responseFuture.setCause(f.cause());
    //putResponse的时候会唤醒等待的线程
    responseFuture.putResponse(null);
    log.warn("send a request command to channel <" + addr + "> failed.");
   }
  });

  //只等待一段时间,不会一直等下去
  //若正常响应,则收到响应后,此线程会被唤醒,继续执行下去
  //若超时,则到达该时间后线程苏醒,继续执行
  RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
  if (null == responseCommand) {
   if (responseFuture.isSendRequestOK()) {
    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
     responseFuture.getCause());
   } else {
    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
   }
  }

  return responseCommand;
 } finally {
  //正常响应完成时,将future释放(正常逻辑)
  //超时时,将future释放。这个请求已经作废了,后面如果再收到响应,就可以直接丢弃了(由于找不到相关的响应钩子,就不处理了)
  this.responseTable.remove(opaque);
 }
}

好,我们再来看看收到报文的时候是怎么处理的。我们都了解JDK中的Future的原理,大概就是将这个任务提交给其他线程处理,该线程处理完毕后会将结果写入到Future对象中,写入时如果有线程在等待该结果,则唤醒这些线程。这里也差不多,只不过执行线程在服务端,服务执行完毕后会将结果通过长连接发送给客户端,客户端收到后根据报文中的ID信息从待响应池中找到Future对象,然后就是类似的处理了。

class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

 //底层解码完毕得到RemotingCommand的报文
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
  processMessageReceived(ctx, msg);
 }
}

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
 final RemotingCommand cmd = msg;
 if (cmd != null) {
  //判断类型
  switch (cmd.getType()) {
   case REQUEST_COMMAND:
    processRequestCommand(ctx, cmd);
    break;
   case RESPONSE_COMMAND:
    processResponseCommand(ctx, cmd);
    break;
   default:
    break;
  }
 }
}

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
 //取得消息id
 final int opaque = cmd.getOpaque();
 //从待响应池中取得对应请求
 final ResponseFuture responseFuture = responseTable.get(opaque);
 if (responseFuture != null) {
  //将响应值注入到ResponseFuture对象中,等待线程可从这个对象获取结果
  responseFuture.setResponseCommand(cmd);
  //请求已处理完毕,释放该请求
  responseTable.remove(opaque);

  //如果有回调函数的话则回调(由当前线程处理)
  if (responseFuture.getInvokeCallback() != null) {
   executeInvokeCallback(responseFuture);
  } else {
   //没有的话,则唤醒等待线程(由等待线程做处理)
   responseFuture.putResponse(cmd);
   responseFuture.release();
  }
 } else {
  log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
  log.warn(cmd.toString());
 }
}

总结一下,客户端的处理时序大概是这样的:

结构大概是这样的:

3.服务端的处理

//todo 服务端待补充CommitLog文件映射相关内容

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    processMessageReceived(ctx, msg);
  }
}

//from NettyRemotingAbscract.java
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
  final RemotingCommand cmd = msg;
  if (cmd != null) {
    switch (cmd.getType()) {
      case REQUEST_COMMAND: //服务端走这里
        processRequestCommand(ctx, cmd);
        break;
      case RESPONSE_COMMAND:
        processResponseCommand(ctx, cmd);
        break;
      default:
        break;
    }
  }
}

//from NettyRemotingAbscract.java
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
  //查看有无该请求code相关的处理器
  final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
  //如果没有,则使用默认处理器(可能没有默认处理器)
  final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
  final int opaque = cmd.getOpaque();

  if (pair != null) {
    Runnable run = new Runnable() {
      @Override
      public void run() {
        try {
          doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
          final RemotingResponseCallback callback = new RemotingResponseCallback() {
            @Override
            public void callback(RemotingCommand response) {
              doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
              if (!cmd.isOnewayRPC()) {
                if (response != null) { //不为null,则由本类将响应值写会给请求方
                  response.setOpaque(opaque);
                  response.markResponseType();
                  try {
                    ctx.writeAndFlush(response);
                  } catch (Throwable e) {
                    log.error("process request over, but response failed", e);
                    log.error(cmd.toString());
                    log.error(response.toString());
                  }
                } else { //为null,意味着processor内部已经将响应处理了,这里无需再处理。
                }
              }
            }
          };
          if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {//QueryMessageProcessor为异步处理器
            AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
            processor.asyncProcessRequest(ctx, cmd, callback);
          } else {
            NettyRequestProcessor processor = pair.getObject1();
            RemotingCommand response = processor.processRequest(ctx, cmd);
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
            callback.callback(response);
          }
        } catch (Throwable e) {
          log.error("process request exception", e);
          log.error(cmd.toString());

          if (!cmd.isOnewayRPC()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
              RemotingHelper.exceptionSimpleDesc(e));
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
          }
        }
      }
    };

    if (pair.getObject1().rejectRequest()) {
      final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
        "[REJECTREQUEST]system busy, start flow control for a while");
      response.setOpaque(opaque);
      ctx.writeAndFlush(response);
      return;
    }

    try {
      final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
      pair.getObject2().submit(requestTask);
    } catch (RejectedExecutionException e) {
      if ((System.currentTimeMillis() % 10000) == 0) {
        log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
          + ", too many requests and system thread pool busy, RejectedExecutionException "
          + pair.getObject2().toString()
          + " request code: " + cmd.getCode());
      }

      if (!cmd.isOnewayRPC()) {
        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
          "[OVERLOAD]system busy, start flow control for a while");
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
      }
    }
  } else {
    String error = " request type " + cmd.getCode() + " not supported";
    final RemotingCommand response =
      RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
    response.setOpaque(opaque);
    ctx.writeAndFlush(response);
    log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
  }
}

//from QueryMessageProcesor.java
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
  throws RemotingCommandException {
  switch (request.getCode()) {
    case RequestCode.QUERY_MESSAGE:
      return this.queryMessage(ctx, request);
    case RequestCode.VIEW_MESSAGE_BY_ID: //通过msgId查询消息
      return this.viewMessageById(ctx, request);
    default:
      break;
  }

  return null;
}

public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
  throws RemotingCommandException {
  final RemotingCommand response = RemotingCommand.createResponseCommand(null);
  final ViewMessageRequestHeader requestHeader =
    (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);

  response.setOpaque(request.getOpaque());

  //getMessagetStore得到当前映射到内存中的CommitLog文件,然后根据偏移量取得数据
  final SelectMappedBufferResult selectMappedBufferResult =
    this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
  if (selectMappedBufferResult != null) {
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);

    //将响应通过socket写回给客户端
    try {
      //response对象的数据作为header
      //消息内容作为body
      FileRegion fileRegion =
        new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
          selectMappedBufferResult);
      ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          selectMappedBufferResult.release();
          if (!future.isSuccess()) {
            log.error("Transfer one message from page cache failed, ", future.cause());
          }
        }
      });
    } catch (Throwable e) {
      log.error("", e);
      selectMappedBufferResult.release();
    }

    return null; //如果有值,则直接写回给请求方。这里返回null是不需要由外层处理响应。
  } else {
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("can not find message by the offset, " + requestHeader.getOffset());
  }

  return response;
}

总结

到此这篇关于RocketMQ获取指定消息的文章就介绍到这了,更多相关RocketMQ获取指定消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 使用Kotlin+RocketMQ实现延时消息的示例代码

    一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 使用延时消息的典型场景,例如: 在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消. 在电商系统中,用户七天内没有评价商品,则默认好评. 这些场景对应的解决方案,包括: 轮询遍历数据库记录 JDK 的 DelayQueue ScheduledExecutorService 基于 Quartz 的定时任务 基于 Redis 的 zset 实现延时队列. 除此之外,

  • java rocketmq--消息的产生(普通消息)

    前言 与消息发送紧密相关的几行代码: 1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); 2. producer.start(); 3. Message msg = new Message(...) 4. SendResult sendResult = producer.send(msg); 5. producer.shutdown(); 那这几行代码执行时,背后都做了什么? 一. 首先

  • RocketMq事务消息发送代码流程详解

    一.RocketMq事务消息流程: 1.首先会向broker发送一个预请求消息,消费者不可见 2.回调执行本地事务(比如操作数据库) 3.事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见.如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功. 二.RocketMq事务消息实例: 1.引入rocketMq相关的依赖: <dependency> <groupId>org.apache.ro

  • RocketMQ重试机制及消息幂代码实例解析

    这篇文章主要介绍了RocketMQ重试机制及消息幂代码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.重试机制 1.由于MQ经常处于复杂的分布式系统中,考虑网络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题.因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点.如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响.所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好

  • RocketMQ获取指定消息的实现方法(源码)

    概要 消息查询是什么? 消息查询就是根据用户提供的msgId从MQ中取出该消息 RocketMQ如果有多个节点如何查询? 问题:RocketMQ分布式结构中,数据分散在各个节点,即便是同一Topic的数据,也未必都在一个broker上.客户端怎么知道数据该去哪个节点上查? 猜想1:逐个访问broker节点查询数据 猜想2:有某种数据中心存在,该中心知道所有消息存储的位置,只要向该中心查询即可得到消息具体位置,进而取得消息内容 实际: 1.消息Id中含有消息所在的broker的地址信息(IP\Po

  • JavaScript获取指定元素位置的方法

    本文实例讲述了JavaScript获取指定元素位置的方法.分享给大家供大家参考.具体如下: 复制代码 代码如下: function showpane() {   var self = document.getElementById("eID");   var left = self.getBoundingClientRect().left + document.documentElement.scrollLeft;   var top = self.getBoundingClientR

  • 解析ConcurrentHashMap: put方法源码分析

    上一章:预热(内部一些小方法分析) put()方法是并发HashMap源码分析的重点方法,这里涉及到并发扩容,桶位寻址等等- JDK1.8 ConcurrentHashMap结构图: 1.put方法源码解析 // 向并发Map中put一个数据 public V put(K key, V value) { return putVal(key, value, false); } // 向并发Map中put一个数据 // Key: 数据的键 // value:数据的值 // onlyIfAbsent:

  • Java终止线程实例和stop()方法源码阅读

    了解线程 概念 线程 是程序中的执行线程.Java 虚拟机允许应用程序并发地运行多个执行线程. 线程特点 拥有状态,表示线程的状态,同一时刻中,JVM中的某个线程只有一种状态; ·NEW 尚未启动的线程(程序运行开始至今一次未启动的线程) ·RUNNABLE 可运行的线程,正在JVM中运行,但它可能在等待其他资源,如CPU. ·BLOCKED 阻塞的线程,等待某个锁允许它继续运行 ·WAITING 无限等待(再次运行依赖于让它进入该状态的线程执行某个特定操作) ·TIMED_WAITING 定时

  • thinkphp3.2.0 setInc方法 源码全面解析

    我们先来看一下setInc的官方示例: 需要一个字段和一个自增的值(默认为1) 我们通过下面这个例子来一步步分析他的底层是怎么实现的: <?php namespace Home\Controller; use Think\Controller; class TestController extends Controller { public function test() { $tb_test = M('test'); $tb_test->where(['id'=>1])->set

  • Vue之vue.$set()方法源码案例详解

    在使用vue开发项目的过程中,经常会遇到这样的问题:当vue的data里边声明或者已经赋值过的对象或者数组(数组里边的值是对象)时,向对象中添加新的属性,如果更新此属性的值,是不会更新视图的. 这是因为新加入的属性不是响应式的,因此不会触发视图的更新,通常使用静态方法Vue.set()或者实例方法this.$set()解决 ,使用方式: 对象:this.$set(target,key,  value) 数组:this.$set(target,index,  value) 但不管是静态方法Vue.

  • Spring SpringMVC在启动完成后执行方法源码解析

    关键字:spring容器加载完毕做一件事情(利用ContextRefreshedEvent事件) 应用场景:很多时候我们想要在某个类加载完毕时干某件事情,但是使用了spring管理对象,我们这个类引用了其他类(可能是更复杂的关联),所以当我们去使用这个类做事情时发现包空指针错误,这是因为我们这个类有可能已经初始化完成,但是引用的其他类不一定初始化完成,所以发生了空指针错误,解决方案如下: 1.写一个类继承spring的ApplicationListener监听,并监控ContextRefresh

  • 解析ConcurrentHashMap: transfer方法源码分析(难点)

    上一篇文章介绍过put方法以及其相关的方法,接下来,本篇就介绍一下transfer这个方法(比较难),最好能动手结合着源码进行分析,并仔细理解前面几篇文章的内容~ 注:代码分析的注释中的CASE0.CASE1- ,这些并没有直接关联关系,只是为了给每个if逻辑判断加一个标识,方便在其他逻辑判断的地方进行引用而已. 再复习一下并发Map的结构图: 1.transfer方法 transfer方法的作用是:迁移元素,扩容时table容量变为原来的两倍,并把部分元素迁移到其它桶nextTable中.该方

  • Netty启动步骤绑定端口示例方法源码分析

    目录 绑定端口 我们继续跟第一小节的最初的doBind()方法 第二步, 获得channel 重点关注下doBind(localAddress)方法 最终会走到这一步, pipeline.fireChannelActive() 章节总结 前文传送门:Netty启动流程注册多路复用源码解析 绑定端口 上一小节我们学习了channel注册在selector的步骤, 仅仅做了注册但并没有监听事件, 事件是如何监听的呢? 我们继续跟第一小节的最初的doBind()方法 private ChannelFu

  • CentOS 6.3 安装配置Apache2.2.6的方法(源码编译安装)

    安装说明 安装环境:CentOS-6.3 安装方式:源码编译安装 软件:httpd-2.2.6.tar.gz | pcre-8.32.tar.gz | apr-1.4.6.tar.gz | apr-util-1.5.1.tar.gz 下载地址:http://mirror.bjtu.edu.cn/apache/httpd/ http://apr.apache.org/download.cgi http://jaist.dl.sourceforge.net/project/pcre/pcre 安装位

随机推荐