RocketMQ获取指定消息的实现方法(源码)
概要
消息查询是什么?
消息查询就是根据用户提供的msgId从MQ中取出该消息
RocketMQ如果有多个节点如何查询?
问题:RocketMQ分布式结构中,数据分散在各个节点,即便是同一Topic的数据,也未必都在一个broker上。客户端怎么知道数据该去哪个节点上查?
猜想1:逐个访问broker节点查询数据
猜想2:有某种数据中心存在,该中心知道所有消息存储的位置,只要向该中心查询即可得到消息具体位置,进而取得消息内容
实际:
1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及该消息在CommitLog中的偏移量。
2.客户端实现会从msgId字符串中解析出broker地址,向指定broker节查询消息。
问题:CommitLog文件有多个,只有偏移量估计不能确定在哪个文件吧?
实际:单个Broker节点内offset是全局唯一的,不是每个CommitLog文件的偏移量都是从0开始的。单个节点内所有CommitLog文件共用一套偏移量,每个文件的文件名为其第一个消息的偏移量。所以可以根据偏移量和文件名确定CommitLog文件。
源码阅读
0.使用方式
MessageExt msg = consumer.viewMessage(msgId);
1.消息ID解析
这个了解下就可以了
public class MessageId { private SocketAddress address; private long offset; public MessageId(SocketAddress address, long offset) { this.address = address; this.offset = offset; } //get-set } //from MQAdminImpl.java public MessageExt viewMessage( String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { MessageId messageId = null; try { //从msgId字符串中解析出address和offset //address = ip:port //offset为消息在CommitLog文件中的偏移量 messageId = MessageDecoder.decodeMessageId(msgId); } catch (Exception e) { throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message."); } return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()), messageId.getOffset(), timeoutMillis); } //from MessageDecoder.java public static MessageId decodeMessageId(final String msgId) throws UnknownHostException { SocketAddress address; long offset; //ipv4和ipv6的区别 //如果msgId总长度超过32字符,则为ipv6 int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2; byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength)); byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8)); ByteBuffer bb = ByteBuffer.wrap(port); int portInt = bb.getInt(0); address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt); // offset byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16)); bb = ByteBuffer.wrap(data); offset = bb.getLong(0); return new MessageId(address, offset); }
2.长连接客户端RPC实现
要发请求首先得先建立连接,这里方法可以看到创建连接相关的操作。值得注意的是,第一次访问的时候可能连接还没建立,建立连接需要消耗一段时间。代码中对这个时间也做了判断,如果连接建立完成后,发现已经超时,则不再发出请求。目的应该是尽可能减少请求线程的阻塞时间。
//from NettyRemotingClient.java @Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); //这里会先检查有无该地址的通道,有则返回,无则创建 final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { //前置钩子 doBeforeRpcHooks(addr, request); //判断通道建立完成时是否已到达超时时间,如果超时直接抛出异常。不发请求 long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } //同步调用 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); //后置钩子 doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); //后置钩子 return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
下一步看看它的同步调用做了什么处理。注意到它会构建一个Future对象加入待响应池,发出请求报文后就挂起线程,然后等待唤醒(waitResponse内部使用CountDownLatch等待)。
//from NettyRemotingAbstract.javapublic RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { //请求id final int opaque = request.getOpaque(); try { //请求存根 final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); //加入待响应的请求池 this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); //将请求发出,成功发出时更新状态 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { //若成功发出,更新请求状态为“已发出” responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } //若发出失败,则从池中移除(没用了,释放资源) responseTable.remove(opaque); responseFuture.setCause(f.cause()); //putResponse的时候会唤醒等待的线程 responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); //只等待一段时间,不会一直等下去 //若正常响应,则收到响应后,此线程会被唤醒,继续执行下去 //若超时,则到达该时间后线程苏醒,继续执行 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { //正常响应完成时,将future释放(正常逻辑) //超时时,将future释放。这个请求已经作废了,后面如果再收到响应,就可以直接丢弃了(由于找不到相关的响应钩子,就不处理了) this.responseTable.remove(opaque); } }
好,我们再来看看收到报文的时候是怎么处理的。我们都了解JDK中的Future的原理,大概就是将这个任务提交给其他线程处理,该线程处理完毕后会将结果写入到Future对象中,写入时如果有线程在等待该结果,则唤醒这些线程。这里也差不多,只不过执行线程在服务端,服务执行完毕后会将结果通过长连接发送给客户端,客户端收到后根据报文中的ID信息从待响应池中找到Future对象,然后就是类似的处理了。
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> { //底层解码完毕得到RemotingCommand的报文 @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } } public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { //判断类型 switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } } public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { //取得消息id final int opaque = cmd.getOpaque(); //从待响应池中取得对应请求 final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { //将响应值注入到ResponseFuture对象中,等待线程可从这个对象获取结果 responseFuture.setResponseCommand(cmd); //请求已处理完毕,释放该请求 responseTable.remove(opaque); //如果有回调函数的话则回调(由当前线程处理) if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { //没有的话,则唤醒等待线程(由等待线程做处理) responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
总结一下,客户端的处理时序大概是这样的:
结构大概是这样的:
3.服务端的处理
//todo 服务端待补充CommitLog文件映射相关内容
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } } //from NettyRemotingAbscract.java public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: //服务端走这里 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } } //from NettyRemotingAbscract.java public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { //查看有无该请求code相关的处理器 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); //如果没有,则使用默认处理器(可能没有默认处理器) final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); final RemotingResponseCallback callback = new RemotingResponseCallback() { @Override public void callback(RemotingCommand response) { doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { //不为null,则由本类将响应值写会给请求方 response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { //为null,意味着processor内部已经将响应处理了,这里无需再处理。 } } } }; if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {//QueryMessageProcessor为异步处理器 AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); processor.asyncProcessRequest(ctx, cmd, callback); } else { NettyRequestProcessor processor = pair.getObject1(); RemotingCommand response = processor.processRequest(ctx, cmd); doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); callback.callback(response); } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } } //from QueryMessageProcesor.java @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.QUERY_MESSAGE: return this.queryMessage(ctx, request); case RequestCode.VIEW_MESSAGE_BY_ID: //通过msgId查询消息 return this.viewMessageById(ctx, request); default: break; } return null; } public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ViewMessageRequestHeader requestHeader = (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class); response.setOpaque(request.getOpaque()); //getMessagetStore得到当前映射到内存中的CommitLog文件,然后根据偏移量取得数据 final SelectMappedBufferResult selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset()); if (selectMappedBufferResult != null) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); //将响应通过socket写回给客户端 try { //response对象的数据作为header //消息内容作为body FileRegion fileRegion = new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()), selectMappedBufferResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { selectMappedBufferResult.release(); if (!future.isSuccess()) { log.error("Transfer one message from page cache failed, ", future.cause()); } } }); } catch (Throwable e) { log.error("", e); selectMappedBufferResult.release(); } return null; //如果有值,则直接写回给请求方。这里返回null是不需要由外层处理响应。 } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("can not find message by the offset, " + requestHeader.getOffset()); } return response; }
总结
到此这篇关于RocketMQ获取指定消息的文章就介绍到这了,更多相关RocketMQ获取指定消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!