[GitHub] [rocketmq] lijunyong commented on issue #362: deploy ha cluster: Slave fall behind master: 180 bytes

2019-10-20 Thread GitBox
lijunyong commented on issue #362: deploy ha cluster: Slave fall behind master: 
180 bytes
URL: https://github.com/apache/rocketmq/issues/362#issuecomment-544360464
 
 
   @shibd 我也遇到了,你怎么解决的


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq] coveralls edited a comment on issue #1422: [RIP-16]Support request/response pattern

2019-10-20 Thread GitBox
coveralls edited a comment on issue #1422: [RIP-16]Support request/response 
pattern
URL: https://github.com/apache/rocketmq/pull/1422#issuecomment-531618889
 
 
   
   [![Coverage 
Status](https://coveralls.io/builds/26429718/badge)](https://coveralls.io/builds/26429718)
   
   Coverage increased (+0.7%) to 50.23% when pulling 
**01827c8dd3a4e375cdea4e802f93c802ec43ea83 on qqeasonchen:rocketmq-dev-rpc** 
into **61f4f99b884d6b1d716414b14ed2e494d201bebe on apache:develop**.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq-client-go] wolftankk opened a new issue #259: [native] close consumer lock when the client has shutdown

2019-10-20 Thread GitBox
wolftankk opened a new issue #259: [native] close consumer lock when the client 
has shutdown
URL: https://github.com/apache/rocketmq-client-go/issues/259
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq-client-go] zenghur commented on issue #257: [Master] ReConsumeLater don't work as expected.

2019-10-20 Thread GitBox
zenghur commented on issue #257: [Master] ReConsumeLater don't work as expected.
URL: 
https://github.com/apache/rocketmq-client-go/issues/257#issuecomment-544339649
 
 
   > could you please share the log?
   
   There is no log. Returning `ReConsumeLater` behaves like returning 
`ConsumeSuccess` and I haven't seen the message again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq-client-go] wolftankk opened a new pull request #258: consumerGroup cannot auto create

2019-10-20 Thread GitBox
wolftankk opened a new pull request #258: consumerGroup cannot auto create
URL: https://github.com/apache/rocketmq-client-go/pull/258
 
 
   more detail see issue #194 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq-operator] bix29 opened a new issue #13: does storageMode support ceph ?

2019-10-20 Thread GitBox
bix29 opened a new issue #13: does storageMode support ceph ?
URL: https://github.com/apache/rocketmq-operator/issues/13
 
 
   i want to use a exsiting ceph cluster as storage, how to configure ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq-externals] xujianhai666 opened a new issue #440: [Connector] Support partial task worker restart

2019-10-20 Thread GitBox
xujianhai666 opened a new issue #440: [Connector] Support partial task worker 
restart 
URL: https://github.com/apache/rocketmq-externals/issues/440
 
 
   The issue tracker is **ONLY** used for bug report and feature request. 
   
   Any question or RocketMQ proposal please use our [mailing 
lists](http://rocketmq.apache.org/about/contact/).
   
   **BUG REPORT**
   
   1. Please describe the issue you observed:
   
   - What did you do (The steps to reproduce)?
   
   - What did you expect to see?
   
   - What did you see instead?
   
   2. Please tell us about your environment:
   
   3. Other information (e.g. detailed explanation, logs, related issues, 
suggestions how to fix, etc):
   
   **FEATURE REQUEST**
   
   1. Please describe the feature you are requesting.
   
   2. Provide any additional detail on your proposed use case for this feature.
   
   2. Indicate the importance of this issue to you (blocker, must-have, 
should-have, nice-to-have). Are you currently using any workarounds to address 
this issue?
   
   4. If there are some sub-tasks using -[] for each subtask and create a 
corresponding issue to map to the sub task:
   
   - [sub-task1-issue-number](example_sub_issue1_link_here): sub-task1 
description here, 
   - [sub-task2-issue-number](example_sub_issue2_link_here): sub-task2 
description here,
   - ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq-externals] xujianhai666 opened a new issue #439: [Replicator] Add topic->task assign agl

2019-10-20 Thread GitBox
xujianhai666 opened a new issue #439: [Replicator] Add topic->task assign agl  
URL: https://github.com/apache/rocketmq-externals/issues/439
 
 
   The issue tracker is **ONLY** used for bug report and feature request. 
   
   Any question or RocketMQ proposal please use our [mailing 
lists](http://rocketmq.apache.org/about/contact/).
   
   **BUG REPORT**
   
   1. Please describe the issue you observed:
   
   - What did you do (The steps to reproduce)?
   
   - What did you expect to see?
   
   - What did you see instead?
   
   2. Please tell us about your environment:
   
   3. Other information (e.g. detailed explanation, logs, related issues, 
suggestions how to fix, etc):
   
   **FEATURE REQUEST**
   
   1. Please describe the feature you are requesting.
   
   2. Provide any additional detail on your proposed use case for this feature.
   
   2. Indicate the importance of this issue to you (blocker, must-have, 
should-have, nice-to-have). Are you currently using any workarounds to address 
this issue?
   
   4. If there are some sub-tasks using -[] for each subtask and create a 
corresponding issue to map to the sub task:
   
   - [sub-task1-issue-number](example_sub_issue1_link_here): sub-task1 
description here, 
   - [sub-task2-issue-number](example_sub_issue2_link_here): sub-task2 
description here,
   - ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq] YngJian commented on issue #1542: Some questions about the location of consumers and the automatic deletion of messages

2019-10-20 Thread GitBox
YngJian commented on issue #1542: Some questions about the location of 
consumers and the automatic deletion of messages
URL: https://github.com/apache/rocketmq/issues/1542#issuecomment-544327966
 
 
   
谢谢了,还有一个问题就是,我消费者定义的是广播模式,消息发送成功,有时候会出现消费迟迟没有成功,而且出现的次数不少,但是上管理台看死信队列又没有这条消息,这是会因为什么原因。


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq] duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern

2019-10-20 Thread GitBox
duhenglucky commented on a change in pull request #1422: [RIP-16]Support 
request/response pattern
URL: https://github.com/apache/rocketmq/pull/1422#discussion_r336819082
 
 

 ##
 File path: 
client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
 ##
 @@ -213,4 +221,79 @@ private RemotingCommand 
consumeMessageDirectly(ChannelHandlerContext ctx,
 
 return response;
 }
+
+private RemotingCommand receiveReplyMssage(ChannelHandlerContext ctx,
+RemotingCommand request) throws RemotingCommandException {
+
+final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+long receiveTime = System.currentTimeMillis();
+ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) 
request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class);
+
+try {
+MessageExt msg = new MessageExt();
+msg.setTopic(requestHeader.getTopic());
+msg.setQueueId(requestHeader.getQueueId());
+msg.setStoreTimestamp(requestHeader.getStoreTimestamp());
+
+if (requestHeader.getBornHost() != null) {
+String[] bornHostArr = requestHeader.getBornHost().split("/");
+String bornHost/*ip:port*/ = bornHostArr[bornHostArr.length - 
1];
+String[] host = bornHost.split(":");
+if (host.length == 2)
+msg.setBornHost(new InetSocketAddress(host[0], 
Integer.parseInt(host[1])));
+}
+
+if (requestHeader.getStoreHost() != null) {
+String[] storeHostArr = 
requestHeader.getStoreHost().split("/");
+String storeHost = storeHostArr[storeHostArr.length - 1];
+String[] host = storeHost.split(":");
+if (host.length == 2)
+msg.setStoreHost(new InetSocketAddress(host[0], 
Integer.parseInt(host[1])));
+}
+
+byte[] body = request.getBody();
+if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) 
== MessageSysFlag.COMPRESSED_FLAG) {
+try {
+body = UtilAll.uncompress(body);
+} catch (IOException e) {
+log.warn("err when uncompress constant", e);
+}
+}
+msg.setBody(body);
+msg.setFlag(requestHeader.getFlag());
+MessageAccessor.setProperties(msg, 
MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime));
+msg.setBornTimestamp(requestHeader.getBornTimestamp());
+msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 
0 : requestHeader.getReconsumeTimes());
+log.debug("receive reply message :{}", msg);
+
+processReplyMessage(msg);
+
+response.setCode(ResponseCode.SUCCESS);
+response.setRemark(null);
+} catch (Exception e) {
+log.warn("unknown err when receiveReplyMsg", e);
+response.setCode(ResponseCode.SYSTEM_ERROR);
+response.setRemark("process reply message fail");
+}
+return response;
+}
+
+private void processReplyMessage(MessageExt replyMsg) {
+final String correlationId = 
replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);
+final RequestResponseFuture requestResponseFuture = 
RequestFutureTable.getRequestFutureTable().get(correlationId);
+if (requestResponseFuture != null) {
+requestResponseFuture.putResponseMessage(replyMsg);
+
+RequestFutureTable.getRequestFutureTable().remove(correlationId);
+
+if (requestResponseFuture.getRequestCallback() != null) {
+requestResponseFuture.getRequestCallback().onSuccess(replyMsg);
+} else {
+requestResponseFuture.putResponseMessage(replyMsg);
+}
+} else {
+log.warn(String.format("receive reply message, but not matched any 
request, CorrelationId: %s", correlationId));
 
 Review comment:
   Eh, just a suggestion for tracing the reply whether in line with 
expectations, maybe the reply message be better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq] qqeasonchen commented on a change in pull request #1422: [RIP-16]Support request/response pattern

2019-10-20 Thread GitBox
qqeasonchen commented on a change in pull request #1422: [RIP-16]Support 
request/response pattern
URL: https://github.com/apache/rocketmq/pull/1422#discussion_r336818173
 
 

 ##
 File path: 
client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
 ##
 @@ -213,4 +221,79 @@ private RemotingCommand 
consumeMessageDirectly(ChannelHandlerContext ctx,
 
 return response;
 }
+
+private RemotingCommand receiveReplyMssage(ChannelHandlerContext ctx,
+RemotingCommand request) throws RemotingCommandException {
+
+final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+long receiveTime = System.currentTimeMillis();
+ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) 
request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class);
+
+try {
+MessageExt msg = new MessageExt();
+msg.setTopic(requestHeader.getTopic());
+msg.setQueueId(requestHeader.getQueueId());
+msg.setStoreTimestamp(requestHeader.getStoreTimestamp());
+
+if (requestHeader.getBornHost() != null) {
+String[] bornHostArr = requestHeader.getBornHost().split("/");
+String bornHost/*ip:port*/ = bornHostArr[bornHostArr.length - 
1];
+String[] host = bornHost.split(":");
+if (host.length == 2)
+msg.setBornHost(new InetSocketAddress(host[0], 
Integer.parseInt(host[1])));
+}
+
+if (requestHeader.getStoreHost() != null) {
+String[] storeHostArr = 
requestHeader.getStoreHost().split("/");
+String storeHost = storeHostArr[storeHostArr.length - 1];
+String[] host = storeHost.split(":");
+if (host.length == 2)
+msg.setStoreHost(new InetSocketAddress(host[0], 
Integer.parseInt(host[1])));
+}
+
+byte[] body = request.getBody();
+if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) 
== MessageSysFlag.COMPRESSED_FLAG) {
+try {
+body = UtilAll.uncompress(body);
+} catch (IOException e) {
+log.warn("err when uncompress constant", e);
+}
+}
+msg.setBody(body);
+msg.setFlag(requestHeader.getFlag());
+MessageAccessor.setProperties(msg, 
MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime));
+msg.setBornTimestamp(requestHeader.getBornTimestamp());
+msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 
0 : requestHeader.getReconsumeTimes());
+log.debug("receive reply message :{}", msg);
+
+processReplyMessage(msg);
+
+response.setCode(ResponseCode.SUCCESS);
+response.setRemark(null);
+} catch (Exception e) {
+log.warn("unknown err when receiveReplyMsg", e);
+response.setCode(ResponseCode.SYSTEM_ERROR);
+response.setRemark("process reply message fail");
+}
+return response;
+}
+
+private void processReplyMessage(MessageExt replyMsg) {
+final String correlationId = 
replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);
+final RequestResponseFuture requestResponseFuture = 
RequestFutureTable.getRequestFutureTable().get(correlationId);
+if (requestResponseFuture != null) {
+requestResponseFuture.putResponseMessage(replyMsg);
+
+RequestFutureTable.getRequestFutureTable().remove(correlationId);
+
+if (requestResponseFuture.getRequestCallback() != null) {
+requestResponseFuture.getRequestCallback().onSuccess(replyMsg);
+} else {
+requestResponseFuture.putResponseMessage(replyMsg);
+}
+} else {
+log.warn(String.format("receive reply message, but not matched any 
request, CorrelationId: %s", correlationId));
 
 Review comment:
   Is the msgId of request message or reply message? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq-client-go] ShannonDing commented on issue #252: [Native] receiveResponse use to many memory

2019-10-20 Thread GitBox
ShannonDing commented on issue #252: [Native] receiveResponse use to many memory
URL: 
https://github.com/apache/rocketmq-client-go/issues/252#issuecomment-544319159
 
 
   link #256 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq-client-go] ShannonDing commented on issue #253: V1.2.0 version consumer return ReConsumeLater but not msg reveive later

2019-10-20 Thread GitBox
ShannonDing commented on issue #253: V1.2.0 version consumer return 
ReConsumeLater but not msg reveive later
URL: 
https://github.com/apache/rocketmq-client-go/issues/253#issuecomment-544319254
 
 
   link #257 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq-client-go] ShannonDing commented on issue #254: [Native] How to connect an instance of Alibaba Cloud RocketMQ, cuz client insists ip name servers

2019-10-20 Thread GitBox
ShannonDing commented on issue #254: [Native] How to connect an instance of 
Alibaba Cloud RocketMQ, cuz client insists ip name servers
URL: 
https://github.com/apache/rocketmq-client-go/issues/254#issuecomment-544318919
 
 
   reffer to 
https://github.com/apache/rocketmq-client-go/blob/native/examples/producer/acl/main.go


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq-client-go] ShannonDing commented on issue #257: [Master] ReConsumeLater don't work as expected.

2019-10-20 Thread GitBox
ShannonDing commented on issue #257: [Master] ReConsumeLater don't work as 
expected.
URL: 
https://github.com/apache/rocketmq-client-go/issues/257#issuecomment-544318577
 
 
   could you please share the log?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq-client-go] zenghur opened a new issue #257: [Master] ReConsumeLater don't work as expected.

2019-10-20 Thread GitBox
zenghur opened a new issue #257: [Master] ReConsumeLater don't work as expected.
URL: https://github.com/apache/rocketmq-client-go/issues/257
 
 
   1. Please describe the issue you observed:
   
   - What did you do (The steps to reproduce)?
   
 Use CommonProducer model to produce a common message and receive the 
message. Then 
 "ReConsumeLater" the message.
   
   - What did you expect to see?
   
  I would see the message again.
   
   - What did you see instead?
   
 The message has gone.
   
   2. Please tell us about your environment:
   
- What is your OS?

   Ubuntu 16.04
   
- What is your client version?

  f73c8d2cd6654d6b791f902ae77d758da21a330f
   
- What is your RocketMQ version?
   
  9fa724eadfc78b61220ca4bdb314db45ecc830d5

   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq] duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern

2019-10-20 Thread GitBox
duhenglucky commented on a change in pull request #1422: [RIP-16]Support 
request/response pattern
URL: https://github.com/apache/rocketmq/pull/1422#discussion_r336776616
 
 

 ##
 File path: 
client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
 ##
 @@ -76,6 +81,9 @@ public RemotingCommand processRequest(ChannelHandlerContext 
ctx,
 
 case RequestCode.CONSUME_MESSAGE_DIRECTLY:
 return this.consumeMessageDirectly(ctx, request);
+
+case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
+return this.receiveReplyMssage(ctx, request);
 
 Review comment:
   receiveReplyMssage ->receiveReplyMessage


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq] duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern

2019-10-20 Thread GitBox
duhenglucky commented on a change in pull request #1422: [RIP-16]Support 
request/response pattern
URL: https://github.com/apache/rocketmq/pull/1422#discussion_r336776831
 
 

 ##
 File path: 
client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
 ##
 @@ -213,4 +221,79 @@ private RemotingCommand 
consumeMessageDirectly(ChannelHandlerContext ctx,
 
 return response;
 }
+
+private RemotingCommand receiveReplyMssage(ChannelHandlerContext ctx,
+RemotingCommand request) throws RemotingCommandException {
+
+final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+long receiveTime = System.currentTimeMillis();
+ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) 
request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class);
+
+try {
+MessageExt msg = new MessageExt();
+msg.setTopic(requestHeader.getTopic());
+msg.setQueueId(requestHeader.getQueueId());
+msg.setStoreTimestamp(requestHeader.getStoreTimestamp());
+
+if (requestHeader.getBornHost() != null) {
+String[] bornHostArr = requestHeader.getBornHost().split("/");
+String bornHost/*ip:port*/ = bornHostArr[bornHostArr.length - 
1];
+String[] host = bornHost.split(":");
+if (host.length == 2)
+msg.setBornHost(new InetSocketAddress(host[0], 
Integer.parseInt(host[1])));
+}
+
+if (requestHeader.getStoreHost() != null) {
+String[] storeHostArr = 
requestHeader.getStoreHost().split("/");
 
 Review comment:
   Maybe need to consider IPV6(such as: 
2408:4004:0180:8100:3faa:1dde:2b3f:898a), more details can be referred in 
org.apache.rocketmq.common.sysflag.MessageSysFlag#BORNHOST_V6_FLAG and 
org.apache.rocketmq.store.CommitLog#putMessage


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [rocketmq] duhenglucky commented on a change in pull request #1422: [RIP-16]Support request/response pattern

2019-10-20 Thread GitBox
duhenglucky commented on a change in pull request #1422: [RIP-16]Support 
request/response pattern
URL: https://github.com/apache/rocketmq/pull/1422#discussion_r336777073
 
 

 ##
 File path: 
client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
 ##
 @@ -213,4 +221,79 @@ private RemotingCommand 
consumeMessageDirectly(ChannelHandlerContext ctx,
 
 return response;
 }
+
+private RemotingCommand receiveReplyMssage(ChannelHandlerContext ctx,
+RemotingCommand request) throws RemotingCommandException {
+
+final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+long receiveTime = System.currentTimeMillis();
+ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) 
request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class);
+
+try {
+MessageExt msg = new MessageExt();
+msg.setTopic(requestHeader.getTopic());
+msg.setQueueId(requestHeader.getQueueId());
+msg.setStoreTimestamp(requestHeader.getStoreTimestamp());
+
+if (requestHeader.getBornHost() != null) {
+String[] bornHostArr = requestHeader.getBornHost().split("/");
+String bornHost/*ip:port*/ = bornHostArr[bornHostArr.length - 
1];
+String[] host = bornHost.split(":");
+if (host.length == 2)
+msg.setBornHost(new InetSocketAddress(host[0], 
Integer.parseInt(host[1])));
+}
+
+if (requestHeader.getStoreHost() != null) {
+String[] storeHostArr = 
requestHeader.getStoreHost().split("/");
+String storeHost = storeHostArr[storeHostArr.length - 1];
+String[] host = storeHost.split(":");
+if (host.length == 2)
+msg.setStoreHost(new InetSocketAddress(host[0], 
Integer.parseInt(host[1])));
+}
+
+byte[] body = request.getBody();
+if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) 
== MessageSysFlag.COMPRESSED_FLAG) {
+try {
+body = UtilAll.uncompress(body);
+} catch (IOException e) {
+log.warn("err when uncompress constant", e);
+}
+}
+msg.setBody(body);
+msg.setFlag(requestHeader.getFlag());
+MessageAccessor.setProperties(msg, 
MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime));
+msg.setBornTimestamp(requestHeader.getBornTimestamp());
+msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 
0 : requestHeader.getReconsumeTimes());
+log.debug("receive reply message :{}", msg);
+
+processReplyMessage(msg);
+
+response.setCode(ResponseCode.SUCCESS);
+response.setRemark(null);
+} catch (Exception e) {
+log.warn("unknown err when receiveReplyMsg", e);
+response.setCode(ResponseCode.SYSTEM_ERROR);
+response.setRemark("process reply message fail");
+}
+return response;
+}
+
+private void processReplyMessage(MessageExt replyMsg) {
+final String correlationId = 
replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);
+final RequestResponseFuture requestResponseFuture = 
RequestFutureTable.getRequestFutureTable().get(correlationId);
+if (requestResponseFuture != null) {
+requestResponseFuture.putResponseMessage(replyMsg);
+
+RequestFutureTable.getRequestFutureTable().remove(correlationId);
+
+if (requestResponseFuture.getRequestCallback() != null) {
+requestResponseFuture.getRequestCallback().onSuccess(replyMsg);
+} else {
+requestResponseFuture.putResponseMessage(replyMsg);
+}
+} else {
+log.warn(String.format("receive reply message, but not matched any 
request, CorrelationId: %s", correlationId));
 
 Review comment:
   It would be nice if can print the born host and msgId for the condition of 
receiving more than one replies because multi-consumer group subscribed to the 
same request topic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services