[GitHub] [rocketmq] lijunyong commented on issue #362: deploy ha cluster: Slave fall behind master: 180 bytes
lijunyong commented on issue #362: deploy ha cluster: Slave fall behind master: 180 bytes URL: https://github.com/apache/rocketmq/issues/362#issuecomment-544360464 @shibd 我也遇到了,你怎么解决的 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq] coveralls edited a comment on issue #1422: [RIP-16]Support request/response pattern
coveralls edited a comment on issue #1422: [RIP-16]Support request/response pattern URL: https://github.com/apache/rocketmq/pull/1422#issuecomment-531618889 [![Coverage Status](https://coveralls.io/builds/26429718/badge)](https://coveralls.io/builds/26429718) Coverage increased (+0.7%) to 50.23% when pulling **01827c8dd3a4e375cdea4e802f93c802ec43ea83 on qqeasonchen:rocketmq-dev-rpc** into **61f4f99b884d6b1d716414b14ed2e494d201bebe on apache:develop**. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq-client-go] wolftankk opened a new issue #259: [native] close consumer lock when the client has shutdown
wolftankk opened a new issue #259: [native] close consumer lock when the client has shutdown URL: https://github.com/apache/rocketmq-client-go/issues/259 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq-client-go] zenghur commented on issue #257: [Master] ReConsumeLater don't work as expected.
zenghur commented on issue #257: [Master] ReConsumeLater don't work as expected. URL: https://github.com/apache/rocketmq-client-go/issues/257#issuecomment-544339649 > could you please share the log? There is no log. Returning `ReConsumeLater` behaves like returning `ConsumeSuccess` and I haven't seen the message again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq-client-go] wolftankk opened a new pull request #258: consumerGroup cannot auto create
wolftankk opened a new pull request #258: consumerGroup cannot auto create URL: https://github.com/apache/rocketmq-client-go/pull/258 more detail see issue #194 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq-operator] bix29 opened a new issue #13: does storageMode support ceph ?
bix29 opened a new issue #13: does storageMode support ceph ? URL: https://github.com/apache/rocketmq-operator/issues/13 i want to use a exsiting ceph cluster as storage, how to configure ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq-externals] xujianhai666 opened a new issue #440: [Connector] Support partial task worker restart
xujianhai666 opened a new issue #440: [Connector] Support partial task worker restart URL: https://github.com/apache/rocketmq-externals/issues/440 The issue tracker is **ONLY** used for bug report and feature request. Any question or RocketMQ proposal please use our [mailing lists](http://rocketmq.apache.org/about/contact/). **BUG REPORT** 1. Please describe the issue you observed: - What did you do (The steps to reproduce)? - What did you expect to see? - What did you see instead? 2. Please tell us about your environment: 3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc): **FEATURE REQUEST** 1. Please describe the feature you are requesting. 2. Provide any additional detail on your proposed use case for this feature. 2. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue? 4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task: - [sub-task1-issue-number](example_sub_issue1_link_here): sub-task1 description here, - [sub-task2-issue-number](example_sub_issue2_link_here): sub-task2 description here, - ... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq-externals] xujianhai666 opened a new issue #439: [Replicator] Add topic->task assign agl
xujianhai666 opened a new issue #439: [Replicator] Add topic->task assign agl URL: https://github.com/apache/rocketmq-externals/issues/439 The issue tracker is **ONLY** used for bug report and feature request. Any question or RocketMQ proposal please use our [mailing lists](http://rocketmq.apache.org/about/contact/). **BUG REPORT** 1. Please describe the issue you observed: - What did you do (The steps to reproduce)? - What did you expect to see? - What did you see instead? 2. Please tell us about your environment: 3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc): **FEATURE REQUEST** 1. Please describe the feature you are requesting. 2. Provide any additional detail on your proposed use case for this feature. 2. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue? 4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task: - [sub-task1-issue-number](example_sub_issue1_link_here): sub-task1 description here, - [sub-task2-issue-number](example_sub_issue2_link_here): sub-task2 description here, - ... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq] YngJian commented on issue #1542: Some questions about the location of consumers and the automatic deletion of messages
YngJian commented on issue #1542: Some questions about the location of consumers and the automatic deletion of messages URL: https://github.com/apache/rocketmq/issues/1542#issuecomment-544327966 谢谢了,还有一个问题就是,我消费者定义的是广播模式,消息发送成功,有时候会出现消费迟迟没有成功,而且出现的次数不少,但是上管理台看死信队列又没有这条消息,这是会因为什么原因。 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq] duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern
duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern URL: https://github.com/apache/rocketmq/pull/1422#discussion_r336819082 ## File path: client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java ## @@ -213,4 +221,79 @@ private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, return response; } + +private RemotingCommand receiveReplyMssage(ChannelHandlerContext ctx, +RemotingCommand request) throws RemotingCommandException { + +final RemotingCommand response = RemotingCommand.createResponseCommand(null); +long receiveTime = System.currentTimeMillis(); +ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class); + +try { +MessageExt msg = new MessageExt(); +msg.setTopic(requestHeader.getTopic()); +msg.setQueueId(requestHeader.getQueueId()); +msg.setStoreTimestamp(requestHeader.getStoreTimestamp()); + +if (requestHeader.getBornHost() != null) { +String[] bornHostArr = requestHeader.getBornHost().split("/"); +String bornHost/*ip:port*/ = bornHostArr[bornHostArr.length - 1]; +String[] host = bornHost.split(":"); +if (host.length == 2) +msg.setBornHost(new InetSocketAddress(host[0], Integer.parseInt(host[1]))); +} + +if (requestHeader.getStoreHost() != null) { +String[] storeHostArr = requestHeader.getStoreHost().split("/"); +String storeHost = storeHostArr[storeHostArr.length - 1]; +String[] host = storeHost.split(":"); +if (host.length == 2) +msg.setStoreHost(new InetSocketAddress(host[0], Integer.parseInt(host[1]))); +} + +byte[] body = request.getBody(); +if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { +try { +body = UtilAll.uncompress(body); +} catch (IOException e) { +log.warn("err when uncompress constant", e); +} +} +msg.setBody(body); +msg.setFlag(requestHeader.getFlag()); +MessageAccessor.setProperties(msg, MessageDecoder.string2messageProperties(requestHeader.getProperties())); +MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime)); +msg.setBornTimestamp(requestHeader.getBornTimestamp()); +msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); +log.debug("receive reply message :{}", msg); + +processReplyMessage(msg); + +response.setCode(ResponseCode.SUCCESS); +response.setRemark(null); +} catch (Exception e) { +log.warn("unknown err when receiveReplyMsg", e); +response.setCode(ResponseCode.SYSTEM_ERROR); +response.setRemark("process reply message fail"); +} +return response; +} + +private void processReplyMessage(MessageExt replyMsg) { +final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID); +final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId); +if (requestResponseFuture != null) { +requestResponseFuture.putResponseMessage(replyMsg); + +RequestFutureTable.getRequestFutureTable().remove(correlationId); + +if (requestResponseFuture.getRequestCallback() != null) { +requestResponseFuture.getRequestCallback().onSuccess(replyMsg); +} else { +requestResponseFuture.putResponseMessage(replyMsg); +} +} else { +log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s", correlationId)); Review comment: Eh, just a suggestion for tracing the reply whether in line with expectations, maybe the reply message be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq] qqeasonchen commented on a change in pull request #1422: [RIP-16]Support request/response pattern
qqeasonchen commented on a change in pull request #1422: [RIP-16]Support request/response pattern URL: https://github.com/apache/rocketmq/pull/1422#discussion_r336818173 ## File path: client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java ## @@ -213,4 +221,79 @@ private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, return response; } + +private RemotingCommand receiveReplyMssage(ChannelHandlerContext ctx, +RemotingCommand request) throws RemotingCommandException { + +final RemotingCommand response = RemotingCommand.createResponseCommand(null); +long receiveTime = System.currentTimeMillis(); +ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class); + +try { +MessageExt msg = new MessageExt(); +msg.setTopic(requestHeader.getTopic()); +msg.setQueueId(requestHeader.getQueueId()); +msg.setStoreTimestamp(requestHeader.getStoreTimestamp()); + +if (requestHeader.getBornHost() != null) { +String[] bornHostArr = requestHeader.getBornHost().split("/"); +String bornHost/*ip:port*/ = bornHostArr[bornHostArr.length - 1]; +String[] host = bornHost.split(":"); +if (host.length == 2) +msg.setBornHost(new InetSocketAddress(host[0], Integer.parseInt(host[1]))); +} + +if (requestHeader.getStoreHost() != null) { +String[] storeHostArr = requestHeader.getStoreHost().split("/"); +String storeHost = storeHostArr[storeHostArr.length - 1]; +String[] host = storeHost.split(":"); +if (host.length == 2) +msg.setStoreHost(new InetSocketAddress(host[0], Integer.parseInt(host[1]))); +} + +byte[] body = request.getBody(); +if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { +try { +body = UtilAll.uncompress(body); +} catch (IOException e) { +log.warn("err when uncompress constant", e); +} +} +msg.setBody(body); +msg.setFlag(requestHeader.getFlag()); +MessageAccessor.setProperties(msg, MessageDecoder.string2messageProperties(requestHeader.getProperties())); +MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime)); +msg.setBornTimestamp(requestHeader.getBornTimestamp()); +msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); +log.debug("receive reply message :{}", msg); + +processReplyMessage(msg); + +response.setCode(ResponseCode.SUCCESS); +response.setRemark(null); +} catch (Exception e) { +log.warn("unknown err when receiveReplyMsg", e); +response.setCode(ResponseCode.SYSTEM_ERROR); +response.setRemark("process reply message fail"); +} +return response; +} + +private void processReplyMessage(MessageExt replyMsg) { +final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID); +final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId); +if (requestResponseFuture != null) { +requestResponseFuture.putResponseMessage(replyMsg); + +RequestFutureTable.getRequestFutureTable().remove(correlationId); + +if (requestResponseFuture.getRequestCallback() != null) { +requestResponseFuture.getRequestCallback().onSuccess(replyMsg); +} else { +requestResponseFuture.putResponseMessage(replyMsg); +} +} else { +log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s", correlationId)); Review comment: Is the msgId of request message or reply message? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq-client-go] ShannonDing commented on issue #252: [Native] receiveResponse use to many memory
ShannonDing commented on issue #252: [Native] receiveResponse use to many memory URL: https://github.com/apache/rocketmq-client-go/issues/252#issuecomment-544319159 link #256 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq-client-go] ShannonDing commented on issue #253: V1.2.0 version consumer return ReConsumeLater but not msg reveive later
ShannonDing commented on issue #253: V1.2.0 version consumer return ReConsumeLater but not msg reveive later URL: https://github.com/apache/rocketmq-client-go/issues/253#issuecomment-544319254 link #257 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq-client-go] ShannonDing commented on issue #254: [Native] How to connect an instance of Alibaba Cloud RocketMQ, cuz client insists ip name servers
ShannonDing commented on issue #254: [Native] How to connect an instance of Alibaba Cloud RocketMQ, cuz client insists ip name servers URL: https://github.com/apache/rocketmq-client-go/issues/254#issuecomment-544318919 reffer to https://github.com/apache/rocketmq-client-go/blob/native/examples/producer/acl/main.go This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq-client-go] ShannonDing commented on issue #257: [Master] ReConsumeLater don't work as expected.
ShannonDing commented on issue #257: [Master] ReConsumeLater don't work as expected. URL: https://github.com/apache/rocketmq-client-go/issues/257#issuecomment-544318577 could you please share the log? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq-client-go] zenghur opened a new issue #257: [Master] ReConsumeLater don't work as expected.
zenghur opened a new issue #257: [Master] ReConsumeLater don't work as expected. URL: https://github.com/apache/rocketmq-client-go/issues/257 1. Please describe the issue you observed: - What did you do (The steps to reproduce)? Use CommonProducer model to produce a common message and receive the message. Then "ReConsumeLater" the message. - What did you expect to see? I would see the message again. - What did you see instead? The message has gone. 2. Please tell us about your environment: - What is your OS? Ubuntu 16.04 - What is your client version? f73c8d2cd6654d6b791f902ae77d758da21a330f - What is your RocketMQ version? 9fa724eadfc78b61220ca4bdb314db45ecc830d5 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq] duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern
duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern URL: https://github.com/apache/rocketmq/pull/1422#discussion_r336776616 ## File path: client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java ## @@ -76,6 +81,9 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, case RequestCode.CONSUME_MESSAGE_DIRECTLY: return this.consumeMessageDirectly(ctx, request); + +case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT: +return this.receiveReplyMssage(ctx, request); Review comment: receiveReplyMssage ->receiveReplyMessage This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq] duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern
duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern URL: https://github.com/apache/rocketmq/pull/1422#discussion_r336776831 ## File path: client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java ## @@ -213,4 +221,79 @@ private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, return response; } + +private RemotingCommand receiveReplyMssage(ChannelHandlerContext ctx, +RemotingCommand request) throws RemotingCommandException { + +final RemotingCommand response = RemotingCommand.createResponseCommand(null); +long receiveTime = System.currentTimeMillis(); +ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class); + +try { +MessageExt msg = new MessageExt(); +msg.setTopic(requestHeader.getTopic()); +msg.setQueueId(requestHeader.getQueueId()); +msg.setStoreTimestamp(requestHeader.getStoreTimestamp()); + +if (requestHeader.getBornHost() != null) { +String[] bornHostArr = requestHeader.getBornHost().split("/"); +String bornHost/*ip:port*/ = bornHostArr[bornHostArr.length - 1]; +String[] host = bornHost.split(":"); +if (host.length == 2) +msg.setBornHost(new InetSocketAddress(host[0], Integer.parseInt(host[1]))); +} + +if (requestHeader.getStoreHost() != null) { +String[] storeHostArr = requestHeader.getStoreHost().split("/"); Review comment: Maybe need to consider IPV6(such as: 2408:4004:0180:8100:3faa:1dde:2b3f:898a), more details can be referred in org.apache.rocketmq.common.sysflag.MessageSysFlag#BORNHOST_V6_FLAG and org.apache.rocketmq.store.CommitLog#putMessage This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [rocketmq] duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern
duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern URL: https://github.com/apache/rocketmq/pull/1422#discussion_r336777073 ## File path: client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java ## @@ -213,4 +221,79 @@ private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, return response; } + +private RemotingCommand receiveReplyMssage(ChannelHandlerContext ctx, +RemotingCommand request) throws RemotingCommandException { + +final RemotingCommand response = RemotingCommand.createResponseCommand(null); +long receiveTime = System.currentTimeMillis(); +ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class); + +try { +MessageExt msg = new MessageExt(); +msg.setTopic(requestHeader.getTopic()); +msg.setQueueId(requestHeader.getQueueId()); +msg.setStoreTimestamp(requestHeader.getStoreTimestamp()); + +if (requestHeader.getBornHost() != null) { +String[] bornHostArr = requestHeader.getBornHost().split("/"); +String bornHost/*ip:port*/ = bornHostArr[bornHostArr.length - 1]; +String[] host = bornHost.split(":"); +if (host.length == 2) +msg.setBornHost(new InetSocketAddress(host[0], Integer.parseInt(host[1]))); +} + +if (requestHeader.getStoreHost() != null) { +String[] storeHostArr = requestHeader.getStoreHost().split("/"); +String storeHost = storeHostArr[storeHostArr.length - 1]; +String[] host = storeHost.split(":"); +if (host.length == 2) +msg.setStoreHost(new InetSocketAddress(host[0], Integer.parseInt(host[1]))); +} + +byte[] body = request.getBody(); +if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { +try { +body = UtilAll.uncompress(body); +} catch (IOException e) { +log.warn("err when uncompress constant", e); +} +} +msg.setBody(body); +msg.setFlag(requestHeader.getFlag()); +MessageAccessor.setProperties(msg, MessageDecoder.string2messageProperties(requestHeader.getProperties())); +MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime)); +msg.setBornTimestamp(requestHeader.getBornTimestamp()); +msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); +log.debug("receive reply message :{}", msg); + +processReplyMessage(msg); + +response.setCode(ResponseCode.SUCCESS); +response.setRemark(null); +} catch (Exception e) { +log.warn("unknown err when receiveReplyMsg", e); +response.setCode(ResponseCode.SYSTEM_ERROR); +response.setRemark("process reply message fail"); +} +return response; +} + +private void processReplyMessage(MessageExt replyMsg) { +final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID); +final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId); +if (requestResponseFuture != null) { +requestResponseFuture.putResponseMessage(replyMsg); + +RequestFutureTable.getRequestFutureTable().remove(correlationId); + +if (requestResponseFuture.getRequestCallback() != null) { +requestResponseFuture.getRequestCallback().onSuccess(replyMsg); +} else { +requestResponseFuture.putResponseMessage(replyMsg); +} +} else { +log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s", correlationId)); Review comment: It would be nice if can print the born host and msgId for the condition of receiving more than one replies because multi-consumer group subscribed to the same request topic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services