[GitHub] vongosling commented on a change in pull request #358: [ISSUE #292] Add support of transactional message feature

2018-07-11 Thread GitBox
vongosling commented on a change in pull request #358: [ISSUE #292] Add support 
of transactional message feature 
URL: https://github.com/apache/rocketmq/pull/358#discussion_r201624161
 
 

 ##
 File path: 
broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
 ##
 @@ -185,4 +190,33 @@ public void unregisterProducer(final String group, final 
ClientChannelInfo clien
 log.error("", e);
 }
 }
+
+public Channel getAvaliableChannel(String groupId) {
+HashMap channelClientChannelInfoHashMap = 
groupChannelTable.get(groupId);
+List channelList = new ArrayList();
+if (channelClientChannelInfoHashMap != null) {
+for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
+channelList.add(channel);
+}
+int size = channelList.size();
+if (0 == size) {
+log.warn("Channel list is empty. groupId={}", groupId);
+return null;
+}
+
+int index = positiveAtomicCounter.incrementAndGet() % size;
+Channel channel = channelList.get(index);
+int count = 0;
+boolean isOk = channel.isActive() && channel.isWritable();
+while (isOk && count++ < 3) {
 
 Review comment:
   3 is a magic number


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vongosling commented on a change in pull request #358: [ISSUE #292] Add support of transactional message feature

2018-07-11 Thread GitBox
vongosling commented on a change in pull request #358: [ISSUE #292] Add support 
of transactional message feature 
URL: https://github.com/apache/rocketmq/pull/358#discussion_r201625122
 
 

 ##
 File path: 
broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
 ##
 @@ -14,9 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.rocketmq.broker.transaction;
-
+@Deprecated
 
 Review comment:
   deprecated only but missing the deadline and replacement is not a good 
practice


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vongosling commented on a change in pull request #358: [ISSUE #292] Add support of transactional message feature

2018-07-11 Thread GitBox
vongosling commented on a change in pull request #358: [ISSUE #292] Add support 
of transactional message feature 
URL: https://github.com/apache/rocketmq/pull/358#discussion_r20162
 
 

 ##
 File path: 
broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
 ##
 @@ -62,34 +60,22 @@ public Broker2Client(BrokerController brokerController) {
 }
 
 public void checkProducerTransactionState(
+final String group,
 final Channel channel,
 final CheckTransactionStateRequestHeader requestHeader,
-final SelectMappedBufferResult selectMappedBufferResult) {
+final MessageExt messageExt) throws Exception {
 RemotingCommand request =
 
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, 
requestHeader);
-request.markOnewayRPC();
-
+request.setBody(MessageDecoder.encode(messageExt, false));
 try {
-FileRegion fileRegion =
-new 
OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
-selectMappedBufferResult);
-channel.writeAndFlush(fileRegion).addListener(new 
ChannelFutureListener() {
-@Override
-public void operationComplete(ChannelFuture future) throws 
Exception {
-selectMappedBufferResult.release();
-if (!future.isSuccess()) {
-log.error("invokeProducer failed,", future.cause());
-}
-}
-});
-} catch (Throwable e) {
-log.error("invokeProducer exception", e);
-selectMappedBufferResult.release();
+this.brokerController.getRemotingServer().invokeOneway(channel, 
request, 10);
 
 Review comment:
   10 ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vongosling commented on a change in pull request #358: [ISSUE #292] Add support of transactional message feature

2018-07-11 Thread GitBox
vongosling commented on a change in pull request #358: [ISSUE #292] Add support 
of transactional message feature 
URL: https://github.com/apache/rocketmq/pull/358#discussion_r201625741
 
 

 ##
 File path: 
broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
 ##
 @@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.transaction.queue;
+
+import 
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
+import org.apache.rocketmq.broker.transaction.OperationResult;
+import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TransactionalMessageServiceImpl implements 
TransactionalMessageService {
+private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
+
+private TransactionalMessageBridge transactionBridge;
+
+private static final int TRY_PULL_MSG_NUMBER = 1;
+
+private final int queueTime = 6;
 
 Review comment:
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vongosling commented on a change in pull request #358: [ISSUE #292] Add support of transactional message feature

2018-07-09 Thread GitBox
vongosling commented on a change in pull request #358: [ISSUE #292] Add support 
of transactional message feature 
URL: https://github.com/apache/rocketmq/pull/358#discussion_r200935682
 
 

 ##
 File path: 
broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
 ##
 @@ -18,7 +18,7 @@
 package org.apache.rocketmq.broker.transaction;
 
 import java.util.List;
-
+@Deprecated
 
 Review comment:
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vongosling commented on a change in pull request #358: [ISSUE #292] Add support of transactional message feature

2018-07-09 Thread GitBox
vongosling commented on a change in pull request #358: [ISSUE #292] Add support 
of transactional message feature 
URL: https://github.com/apache/rocketmq/pull/358#discussion_r200929772
 
 

 ##
 File path: 
example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
 ##
 @@ -153,35 +150,37 @@ private static Message buildMessage(final int 
messageSize) throws UnsupportedEnc
 }
 }
 
-class TransactionExecuterBImpl implements LocalTransactionExecuter {
-
-private boolean ischeck;
-
-public TransactionExecuterBImpl(boolean ischeck) {
-this.ischeck = ischeck;
-}
-
-@Override
-public LocalTransactionState executeLocalTransactionBranch(final Message 
msg, final Object arg) {
-if (ischeck) {
-return LocalTransactionState.UNKNOW;
-}
-return LocalTransactionState.COMMIT_MESSAGE;
-}
-}
-
-class TransactionCheckListenerBImpl implements TransactionCheckListener {
+//class TransactionExecuterBImpl implements LocalTransactionExecuter {
 
 Review comment:
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vongosling commented on a change in pull request #358: [ISSUE #292] Add support of transactional message feature

2018-07-09 Thread GitBox
vongosling commented on a change in pull request #358: [ISSUE #292] Add support 
of transactional message feature 
URL: https://github.com/apache/rocketmq/pull/358#discussion_r200930371
 
 

 ##
 File path: 
client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
 ##
 @@ -17,7 +17,7 @@
 package org.apache.rocketmq.client.producer;
 
 import org.apache.rocketmq.common.message.Message;
-
+@Deprecated
 
 Review comment:
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vongosling commented on a change in pull request #358: [ISSUE #292] Add support of transactional message feature

2018-07-09 Thread GitBox
vongosling commented on a change in pull request #358: [ISSUE #292] Add support 
of transactional message feature 
URL: https://github.com/apache/rocketmq/pull/358#discussion_r200929889
 
 

 ##
 File path: 
example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
 ##
 @@ -153,35 +150,37 @@ private static Message buildMessage(final int 
messageSize) throws UnsupportedEnc
 }
 }
 
-class TransactionExecuterBImpl implements LocalTransactionExecuter {
-
-private boolean ischeck;
-
-public TransactionExecuterBImpl(boolean ischeck) {
-this.ischeck = ischeck;
-}
-
-@Override
-public LocalTransactionState executeLocalTransactionBranch(final Message 
msg, final Object arg) {
-if (ischeck) {
-return LocalTransactionState.UNKNOW;
-}
-return LocalTransactionState.COMMIT_MESSAGE;
-}
-}
-
-class TransactionCheckListenerBImpl implements TransactionCheckListener {
+//class TransactionExecuterBImpl implements LocalTransactionExecuter {
+//
+//private boolean ischeckLocal;
+//
+//public TransactionExecuterBImpl(boolean ischeck) {
+//this.ischeck = ischeck;
+//}
+//
+//@Override
+//public LocalTransactionState executeLocalTransactionBranch(final Message 
msg, final Object arg) {
+//if (ischeck) {
+//return LocalTransactionState.UNKNOW;
+//}
+//return LocalTransactionState.COMMIT_MESSAGE;
+//}
+//}
+
+class TransactionListenerImpl implements TransactionListener {
 private boolean ischeckffalse;
 private StatsBenchmarkTProducer statsBenchmarkTProducer;
+private boolean ischeckLocal;
 
-public TransactionCheckListenerBImpl(boolean ischeckffalse,
-StatsBenchmarkTProducer statsBenchmarkTProducer) {
+public TransactionListenerImpl(boolean ischeckffalse, boolean isCheckLocal,
 
 Review comment:
   ischeckffalse ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vongosling commented on a change in pull request #358: [ISSUE #292] Add support of transactional message feature

2018-07-09 Thread GitBox
vongosling commented on a change in pull request #358: [ISSUE #292] Add support 
of transactional message feature 
URL: https://github.com/apache/rocketmq/pull/358#discussion_r200930216
 
 

 ##
 File path: 
common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
 ##
 @@ -121,9 +118,14 @@ public void setTransactionId(String transactionId) {
 
 @Override
 public String toString() {
-return "EndTransactionRequestHeader [producerGroup=" + producerGroup + 
", tranStateTableOffset="
-+ tranStateTableOffset + ", commitLogOffset=" + commitLogOffset + 
", commitOrRollback="
-+ commitOrRollback + ", fromTransactionCheck=" + 
fromTransactionCheck + ", msgId=" + msgId
-+ "]";
+return "EndTransactionRequestHeader{" +
 
 Review comment:
   ToStringBuilder is easier way to reflect all field in one class


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services