http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 9c9b59e..42ce2f9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -51,7 +51,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; * </p> * * <p> - * <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe. + * <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe. * </p> */ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { @@ -90,29 +90,29 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * * There are three consuming points: * <ul> - * <li> - * <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. - * If it were a newly booting up consumer client, according aging of the consumer group, there are two - * cases: - * <ol> - * <li> - * if the consumer group is created so recently that the earliest message being subscribed has yet - * expired, which means the consumer group represents a lately launched business, consuming will - * start from the very beginning; - * </li> - * <li> - * if the earliest message being subscribed has expired, consuming will start from the latest - * messages, meaning messages born prior to the booting timestamp would be ignored. - * </li> - * </ol> - * </li> - * <li> - * <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available. - * </li> - * <li> - * <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means - * messages born prior to {@link #consumeTimestamp} will be ignored - * </li> + * <li> + * <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. + * If it were a newly booting up consumer client, according aging of the consumer group, there are two + * cases: + * <ol> + * <li> + * if the consumer group is created so recently that the earliest message being subscribed has yet + * expired, which means the consumer group represents a lately launched business, consuming will + * start from the very beginning; + * </li> + * <li> + * if the earliest message being subscribed has expired, consuming will start from the latest + * messages, meaning messages born prior to the booting timestamp would be ignored. + * </li> + * </ol> + * </li> + * <li> + * <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available. + * </li> + * <li> + * <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means + * messages born prior to {@link #consumeTimestamp} will be ignored + * </li> * </ul> */ private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; @@ -223,11 +223,13 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Constructor specifying consumer group, RPC hook and message queue allocating algorithm. + * * @param consumerGroup Consume queue. * @param rpcHook RPC hook to execute before each remoting command. * @param allocateMessageQueueStrategy message queue allocating algorithm. */ - public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); @@ -235,6 +237,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Constructor specifying RPC hook. + * * @param rpcHook RPC hook to execute before each remoting command. */ public DefaultMQPushConsumer(RPCHook rpcHook) { @@ -243,6 +246,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Constructor specifying consumer group. + * * @param consumerGroup Consumer group. */ public DefaultMQPushConsumer(final String consumerGroup) { @@ -280,7 +284,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume } @Override - public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage( + String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId); } @@ -291,7 +296,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume } @Override - public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage(String topic, + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { MessageDecoder.decodeMessageId(msgId); return this.viewMessage(msgId); @@ -411,6 +417,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Send message back to broker which will be re-delivered in future. + * * @param msg Message to send back. * @param delayLevel delay level. * @throws RemotingException if there is any network-tier error. @@ -449,6 +456,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * This method gets internal infrastructure readily to serve. Instances must call this method after configuration. + * * @throws MQClientException if there is any client error. */ @Override @@ -498,7 +506,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * * @param topic topic to subscribe. * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> - * if null or * expression,meaning subscribe all + * if null or * expression,meaning subscribe all * @throws MQClientException if there is any client error. */ @Override @@ -508,10 +516,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Subscribe a topic to consuming subscription. + * * @param topic topic to consume. * @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter * @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety - * @throws MQClientException */ @Override public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { @@ -521,12 +529,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Subscribe a topic by message selector. * - * @see org.apache.rocketmq.client.consumer.MessageSelector#bySql - * @see org.apache.rocketmq.client.consumer.MessageSelector#byTag - * * @param topic topic to consume. * @param messageSelector {@link org.apache.rocketmq.client.consumer.MessageSelector} - * @throws MQClientException + * @see org.apache.rocketmq.client.consumer.MessageSelector#bySql + * @see org.apache.rocketmq.client.consumer.MessageSelector#byTag */ @Override public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException { @@ -535,6 +541,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Un-subscribe the specified topic from subscription. + * * @param topic message topic */ @Override
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java index 03ee4d9..f4a8eda 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java @@ -30,13 +30,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException; public interface MQConsumer extends MQAdmin { /** * If consuming failure,message will be send back to the brokers,and delay consuming some time - * - * @param msg - * @param delayLevel - * @throws InterruptedException - * @throws MQBrokerException - * @throws RemotingException - * @throws MQClientException */ @Deprecated void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException, @@ -44,14 +37,6 @@ public interface MQConsumer extends MQAdmin { /** * If consuming failure,message will be send back to the broker,and delay consuming some time - * - * @param msg - * @param delayLevel - * @param brokerName - * @throws RemotingException - * @throws MQBrokerException - * @throws InterruptedException - * @throws MQClientException */ void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; @@ -61,7 +46,6 @@ public interface MQConsumer extends MQAdmin { * * @param topic message topic * @return queue set - * @throws MQClientException */ Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java index da43aa9..33002c9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java @@ -29,8 +29,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException; public interface MQPullConsumer extends MQConsumer { /** * Start the consumer - * - * @throws MQClientException */ void start() throws MQClientException; @@ -41,9 +39,6 @@ public interface MQPullConsumer extends MQConsumer { /** * Register the message queue listener - * - * @param topic - * @param listener */ void registerMessageQueueListener(final String topic, final MessageQueueListener listener); @@ -51,15 +46,12 @@ public interface MQPullConsumer extends MQConsumer { * Pulling the messages,not blocking * * @param mq from which message queue - * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if null or * expression,meaning subscribe + * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if + * null or * expression,meaning subscribe * all * @param offset from where to pull * @param maxNums max pulling numbers * @return The resulting {@code PullRequest} - * @throws MQClientException - * @throws InterruptedException - * @throws MQBrokerException - * @throws RemotingException */ PullResult pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums) throws MQClientException, RemotingException, MQBrokerException, @@ -68,16 +60,7 @@ public interface MQPullConsumer extends MQConsumer { /** * Pulling the messages in the specified timeout * - * @param mq - * @param subExpression - * @param offset - * @param maxNums - * @param timeout * @return The resulting {@code PullRequest} - * @throws MQClientException - * @throws RemotingException - * @throws MQBrokerException - * @throws InterruptedException */ PullResult pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums, final long timeout) throws MQClientException, RemotingException, @@ -85,15 +68,6 @@ public interface MQPullConsumer extends MQConsumer { /** * Pulling the messages in a async. way - * - * @param mq - * @param subExpression - * @param offset - * @param maxNums - * @param pullCallback - * @throws MQClientException - * @throws RemotingException - * @throws InterruptedException */ void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException, @@ -101,16 +75,6 @@ public interface MQPullConsumer extends MQConsumer { /** * Pulling the messages in a async. way - * - * @param mq - * @param subExpression - * @param offset - * @param maxNums - * @param pullCallback - * @param timeout - * @throws MQClientException - * @throws RemotingException - * @throws InterruptedException */ void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums, final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, @@ -119,15 +83,7 @@ public interface MQPullConsumer extends MQConsumer { /** * Pulling the messages,if no message arrival,blocking some time * - * @param mq - * @param subExpression - * @param offset - * @param maxNums * @return The resulting {@code PullRequest} - * @throws MQClientException - * @throws RemotingException - * @throws MQBrokerException - * @throws InterruptedException */ PullResult pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset, final int maxNums) throws MQClientException, RemotingException, @@ -135,15 +91,6 @@ public interface MQPullConsumer extends MQConsumer { /** * Pulling the messages through callback function,if no message arrival,blocking. - * - * @param mq - * @param subExpression - * @param offset - * @param maxNums - * @param pullCallback - * @throws MQClientException - * @throws RemotingException - * @throws InterruptedException */ void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset, final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException, @@ -151,20 +98,13 @@ public interface MQPullConsumer extends MQConsumer { /** * Update the offset - * - * @param mq - * @param offset - * @throws MQClientException */ void updateConsumeOffset(final MessageQueue mq, final long offset) throws MQClientException; /** * Fetch the offset * - * @param mq - * @param fromStore * @return The fetched offset of given queue - * @throws MQClientException */ long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) throws MQClientException; @@ -173,22 +113,12 @@ public interface MQPullConsumer extends MQConsumer { * * @param topic message topic * @return message queue set - * @throws MQClientException */ Set<MessageQueue> fetchMessageQueuesInBalance(final String topic) throws MQClientException; /** * If consuming failure,message will be send back to the broker,and delay consuming in some time later.<br> * Mind! message can only be consumed in the same group. - * - * @param msg - * @param delayLevel - * @param brokerName - * @param consumerGroup - * @throws RemotingException - * @throws MQBrokerException - * @throws InterruptedException - * @throws MQClientException */ void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java index 9c6c1f1..d56075c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java @@ -27,8 +27,6 @@ import org.apache.rocketmq.client.exception.MQClientException; public interface MQPushConsumer extends MQConsumer { /** * Start the consumer - * - * @throws MQClientException */ void start() throws MQClientException; @@ -39,8 +37,6 @@ public interface MQPushConsumer extends MQConsumer { /** * Register the message listener - * - * @param messageListener */ @Deprecated void registerMessageListener(MessageListener messageListener); @@ -52,22 +48,20 @@ public interface MQPushConsumer extends MQConsumer { /** * Subscribe some topic * - * @param topic - * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if null or * expression,meaning subscribe + * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if + * null or * expression,meaning subscribe * all - * @throws MQClientException */ void subscribe(final String topic, final String subExpression) throws MQClientException; /** * Subscribe some topic * - * @param topic * @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter * @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety - * @throws MQClientException */ - void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException; + void subscribe(final String topic, final String fullClassName, + final String filterClassSource) throws MQClientException; /** * Subscribe some topic with selector. @@ -84,9 +78,7 @@ public interface MQPushConsumer extends MQConsumer { * Choose SQL92: {@link MessageSelector#bySql(java.lang.String)} * </p> * - * @param topic * @param selector message selector({@link MessageSelector}), can be null. - * @throws MQClientException */ void subscribe(final String topic, final MessageSelector selector) throws MQClientException; @@ -99,8 +91,6 @@ public interface MQPushConsumer extends MQConsumer { /** * Update the consumer thread pool size Dynamically - * - * @param corePoolSize */ void updateCorePoolSize(int corePoolSize); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java index 35a5181..0398341 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java @@ -20,14 +20,13 @@ package org.apache.rocketmq.client.consumer; import org.apache.rocketmq.common.filter.ExpressionType; /** - * * Message selector: select message at server. * <p> - * Now, support: - * <li>Tag: {@link org.apache.rocketmq.common.filter.ExpressionType#TAG} - * </li> - * <li>SQL92: {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92} - * </li> + * Now, support: + * <li>Tag: {@link org.apache.rocketmq.common.filter.ExpressionType#TAG} + * </li> + * <li>SQL92: {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92} + * </li> * </p> */ public class MessageSelector { @@ -51,7 +50,6 @@ public class MessageSelector { * Use SLQ92 to select message. * * @param sql if null or empty, will be treated as select all message. - * @return */ public static MessageSelector bySql(String sql) { return new MessageSelector(ExpressionType.SQL92, sql); @@ -61,7 +59,6 @@ public class MessageSelector { * Use tag to select message. * * @param tag if null or empty or "*", will be treated as select all message. - * @return */ public static MessageSelector byTag(String tag) { return new MessageSelector(ExpressionType.TAG, tag); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java index 4eef1a8..cb39d7e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java @@ -24,10 +24,10 @@ import org.apache.rocketmq.common.message.MessageExt; */ public interface MessageListenerConcurrently extends MessageListener { /** - * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure + * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if + * consumption failure * * @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here - * @param context * @return The consume status */ ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java index 74dbb45..d148df5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java @@ -20,14 +20,15 @@ import java.util.List; import org.apache.rocketmq.common.message.MessageExt; /** - * A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one thread + * A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one + * thread */ public interface MessageListenerOrderly extends MessageListener { /** - * It is not recommend to throw exception,rather than returning ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT if consumption failure + * It is not recommend to throw exception,rather than returning ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT + * if consumption failure * * @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here - * @param context * @return The consume status */ ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs, http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java index 09d940a..b00326e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java @@ -30,7 +30,7 @@ import org.slf4j.Logger; /** * Consistent Hashing queue algorithm */ -public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy { +public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy { private final Logger log = ClientLogger.getLog(); private final int virtualNodeCnt; @@ -41,7 +41,7 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue } public AllocateMessageQueueConsistentHash(int virtualNodeCnt) { - this(virtualNodeCnt,null); + this(virtualNodeCnt, null); } public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) { @@ -75,7 +75,6 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue return result; } - Collection<ClientNode> cidNodes = new ArrayList<ClientNode>(); for (String cid : cidAll) { cidNodes.add(new ClientNode(cid)); @@ -105,7 +104,6 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue return "CONSISTENT_HASH"; } - private static class ClientNode implements Node { private final String clientID; @@ -119,6 +117,4 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue } } - - } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java index af745fc..9deed0e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java @@ -29,52 +29,37 @@ import org.apache.rocketmq.remoting.exception.RemotingException; public interface OffsetStore { /** * Load - * - * @throws MQClientException */ void load() throws MQClientException; /** * Update the offset,store it in memory - * - * @param mq - * @param offset - * @param increaseOnly */ void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly); /** * Get offset from local storage * - * @param mq - * @param type * @return The fetched offset */ long readOffset(final MessageQueue mq, final ReadOffsetType type); /** * Persist all offsets,may be in local storage or remote name server - * - * @param mqs */ void persistAll(final Set<MessageQueue> mqs); /** * Persist the offset,may be in local storage or remote name server - * - * @param mq */ void persist(final MessageQueue mq); /** * Remove offset - * - * @param mq */ void removeOffset(MessageQueue mq); /** - * @param topic * @return The cloned offset table of given topic */ Map<MessageQueue, Long> cloneOffsetTable(String topic); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index 2aadc89..69478cf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -55,7 +55,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { } @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand processRequest(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.CHECK_TRANSACTION_STATE: return this.checkTransactionState(ctx, request); @@ -82,7 +83,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { return false; } - public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class); final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody()); @@ -107,7 +109,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { return null; } - public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { try { final NotifyConsumerIdsChangedRequestHeader requestHeader = (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class); @@ -121,12 +124,13 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { return null; } - public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand resetOffset(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final ResetOffsetRequestHeader requestHeader = (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp()); + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getTimestamp()); Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); if (request.getBody() != null) { ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class); @@ -137,7 +141,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { } @Deprecated - public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetConsumerStatusRequestHeader requestHeader = (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); @@ -150,7 +155,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetConsumerRunningInfoRequestHeader requestHeader = (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); @@ -173,7 +179,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 92d8513..b582b81 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -240,7 +240,8 @@ public class MQAdminImpl { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } - public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage( + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { MessageId messageId = null; try { @@ -252,12 +253,14 @@ public class MQAdminImpl { messageId.getOffset(), timeoutMillis); } - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, + long end) throws MQClientException, InterruptedException { return queryMessage(topic, key, maxNum, begin, end, false); } - public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws InterruptedException, MQClientException { + public MessageExt queryMessageByUniqKey(String topic, + String uniqKey) throws InterruptedException, MQClientException { QueryResult qr = this.queryMessage(topic, uniqKey, 32, MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true); @@ -268,7 +271,8 @@ public class MQAdminImpl { } } - protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, boolean isUniqKey) throws MQClientException, + protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, + boolean isUniqKey) throws MQClientException, InterruptedException { TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic); if (null == topicRouteData) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index abdad79..27b02da 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -62,7 +62,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { private final ScheduledExecutorService scheduledExecutorService; private volatile boolean stopped = false; - public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { + public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, + MessageListenerOrderly messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; @@ -204,7 +205,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } } - public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue, final long delayMills) { + public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue, + final long delayMills) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index ef27ff8..b555771 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -55,7 +55,8 @@ public abstract class RebalanceImpl { protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; protected MQClientInstance mQClientFactory; - public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, + public RebalanceImpl(String consumerGroup, MessageModel messageModel, + AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientFactory) { this.consumerGroup = consumerGroup; this.messageModel = messageModel; @@ -327,7 +328,8 @@ public abstract class RebalanceImpl { } } - private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { + private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, + final boolean isOrder) { boolean changed = false; Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); @@ -400,7 +402,8 @@ public abstract class RebalanceImpl { return changed; } - public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided); + public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, + final Set<MessageQueue> mqDivided); public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java index 57bfe2b..9dd408c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java @@ -32,7 +32,8 @@ public class RebalancePullImpl extends RebalanceImpl { this(null, null, null, null, defaultMQPullConsumerImpl); } - public RebalancePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, + public RebalancePullImpl(String consumerGroup, MessageModel messageModel, + AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) { super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); this.defaultMQPullConsumerImpl = defaultMQPullConsumerImpl; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 2f4f745..39e0251 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -40,7 +40,8 @@ public class RebalancePushImpl extends RebalanceImpl { this(null, null, null, null, defaultMQPushConsumerImpl); } - public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, + public RebalancePushImpl(String consumerGroup, MessageModel messageModel, + AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 6ef594b..31c2c3c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -584,7 +584,8 @@ public class MQClientInstance { } } - public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { + public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, + DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { @@ -719,7 +720,8 @@ public class MQClientInstance { return false; } - private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName, final String topic, + private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName, + final String topic, final String filterClassSource) throws UnsupportedEncodingException { byte[] classBody = null; int classCRC = 0; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 602fedd..db9e512 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -248,7 +248,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { } @Override - public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { + public void checkTransactionState(final String addr, final MessageExt msg, + final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { private final String brokerAddr = addr; private final MessageExt message = msg; @@ -386,7 +387,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); } - public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage( + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.makeSureStateOK(); return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); @@ -407,7 +409,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { /** * DEFAULT ASYNC ------------------------------------------------------- */ - public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } @@ -863,7 +866,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { /** * KERNEL ONEWAY ------------------------------------------------------- */ - public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { + public void sendOneway(Message msg, + MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -945,7 +949,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { } } - public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) + public TransactionSendResult sendMessageInTransaction(final Message msg, + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { if (null == tranExecuter) { throw new MQClientException("tranExecutor is null", null); @@ -1013,7 +1018,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { /** * DEFAULT SYNC ------------------------------------------------------- */ - public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + public SendResult send( + Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, this.defaultMQProducer.getSendMsgTimeout()); } @@ -1054,7 +1060,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { this.defaultMQProducer.getSendMsgTimeout()); } - public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + public SendResult send(Message msg, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java b/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java index 05d72b4..356cda3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java +++ b/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java @@ -36,7 +36,7 @@ public class ClientLogger { private static Logger createLogger(final String loggerName) { String logConfigFilePath = System.getProperty("rocketmq.client.log.configFile", - System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE")); + System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE")); Boolean isloadconfig = Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true")); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 135a447..d48686e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -48,8 +48,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException; * </p> * * <p> - * <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe - * and used among multiple threads context. + * <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe + * and used among multiple threads context. * </p> */ public class DefaultMQProducer extends ClientConfig implements MQProducer { @@ -137,6 +137,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Constructor specifying producer group. + * * @param producerGroup Producer group, see the name-sake field. */ public DefaultMQProducer(final String producerGroup) { @@ -145,6 +146,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Constructor specifying the RPC hook. + * * @param rpcHook RPC hook to execute per each remoting command execution. */ public DefaultMQProducer(RPCHook rpcHook) { @@ -178,6 +180,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Fetch message queues of topic <code>topic</code>, to which we may send/publish messages. + * * @param topic Topic to fetch. * @return List of message queues readily to send messages to * @throws MQClientException if there is any client error. @@ -204,12 +207,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws InterruptedException if the sending thread is interrupted. */ @Override - public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + public SendResult send( + Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg); } /** * Same to {@link #send(Message)} with send timeout specified in addition. + * * @param msg Message to send. * @param timeout send timeout. * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, @@ -220,7 +225,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws InterruptedException if the sending thread is interrupted. */ @Override - public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + public SendResult send(Message msg, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, timeout); } @@ -234,6 +240,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * Similar to {@link #send(Message)}, internal implementation would potentially retry up to * {@link #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication * and application developers are the one to resolve this potential issue. + * * @param msg Message to send. * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. * @throws MQClientException if there is any client error. @@ -241,12 +248,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback); } /** * Same to {@link #send(Message, SendCallback)} with send timeout specified in addition. + * * @param msg message to send. * @param sendCallback Callback to execute. * @param timeout send timeout. @@ -263,6 +272,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss. + * * @param msg Message to send. * @throws MQClientException if there is any client error. * @throws RemotingException if there is any network-tier error. @@ -275,6 +285,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #send(Message)} with target message queue specified in addition. + * * @param msg Message to send. * @param mq Target message queue. * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, @@ -327,6 +338,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #send(Message, SendCallback)} with target message queue and send timeout specified. + * * @param msg Message to send. * @param mq Target message queue. * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. @@ -343,6 +355,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #sendOneway(Message)} with target message queue specified. + * * @param msg Message to send. * @param mq Target message queue. * @throws MQClientException if there is any client error. @@ -350,12 +363,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { + public void sendOneway(Message msg, + MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.sendOneway(msg, mq); } /** - * Same to {@link #send(Message)} with message queue selector specified. + * Same to {@link #send(Message)} with message queue selector specified. * * @param msg Message to send. * @param selector Message queue selector, through which we get target message queue to deliver message to. @@ -430,6 +444,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #sendOneway(Message)} with message queue selector specified. + * * @param msg Message to send. * @param selector Message queue selector, through which to determine target message queue to deliver message * @param arg Argument used along with message queue selector. @@ -453,13 +468,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws MQClientException if there is any client error. */ @Override - public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg) + public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, + final Object arg) throws MQClientException { throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); } /** * Create a topic on broker. + * * @param key accesskey * @param newTopic topic name * @param queueNum topic's queue number @@ -472,6 +489,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Create a topic on broker. + * * @param key accesskey * @param newTopic topic name * @param queueNum topic's queue number @@ -485,6 +503,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Search consume queue offset of the given time stamp. + * * @param mq Instance of MessageQueue * @param timestamp from when in milliseconds. * @return Consume queue offset. @@ -509,6 +528,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query minimum offset of the given message queue. + * * @param mq Instance of MessageQueue * @return minimum offset of the given message queue. * @throws MQClientException if there is any client error. @@ -520,6 +540,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query earliest message store time. + * * @param mq Instance of MessageQueue * @return earliest message store time. * @throws MQClientException if there is any client error. @@ -531,6 +552,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query message of the given offset message ID. + * * @param offsetMsgId message id * @return Message specified. * @throws MQBrokerException if there is any broker error. @@ -539,12 +561,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws InterruptedException if the sending thread is interrupted. */ @Override - public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage( + String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return this.defaultMQProducerImpl.viewMessage(offsetMsgId); } /** * Query message by key. + * * @param topic message topic * @param key message key index word * @param maxNum max message number @@ -572,7 +596,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws InterruptedException if the sending thread is interrupted. */ @Override - public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage(String topic, + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId); return this.viewMessage(msgId); @@ -582,22 +607,26 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } @Override - public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + public SendResult send( + Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(batch(msgs)); } @Override - public SendResult send(Collection<Message> msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + public SendResult send(Collection<Message> msgs, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(batch(msgs), timeout); } @Override - public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + public SendResult send(Collection<Message> msgs, + MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(batch(msgs), messageQueue); } @Override - public SendResult send(Collection<Message> msgs, MessageQueue messageQueue, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + public SendResult send(Collection<Message> msgs, MessageQueue messageQueue, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout); } @@ -615,6 +644,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } return msgBatch; } + public String getProducerGroup() { return producerGroup; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java index fa96075..8094883 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java @@ -32,7 +32,8 @@ public class SendResult { public SendResult() { } - public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, long queueOffset) { + public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, + long queueOffset) { this.sendStatus = sendStatus; this.msgId = msgId; this.offsetMsgId = offsetMsgId; @@ -40,7 +41,8 @@ public class SendResult { this.queueOffset = queueOffset; } - public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, + public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, + final long queueOffset, final String transactionId, final String offsetMsgId, final String regionId) { this.sendStatus = sendStatus; this.msgId = msgId; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java index 3314261..9cdeda8 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java @@ -86,7 +86,8 @@ public class DefaultMQPullConsumerTest { @Test public void testPullMessage_Success() throws Exception { doAnswer(new Answer() { - @Override public Object answer(InvocationOnMock mock) throws Throwable { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { PullMessageRequestHeader requestHeader = mock.getArgument(1); return createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt())); } @@ -103,9 +104,10 @@ public class DefaultMQPullConsumerTest { } @Test - public void testPullMessage_NotFound() throws Exception{ + public void testPullMessage_NotFound() throws Exception { doAnswer(new Answer() { - @Override public Object answer(InvocationOnMock mock) throws Throwable { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { PullMessageRequestHeader requestHeader = mock.getArgument(1); return createPullResult(requestHeader, PullStatus.NO_NEW_MSG, new ArrayList<MessageExt>()); } @@ -119,7 +121,8 @@ public class DefaultMQPullConsumerTest { @Test public void testPullMessageAsync_Success() throws Exception { doAnswer(new Answer() { - @Override public Object answer(InvocationOnMock mock) throws Throwable { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { PullMessageRequestHeader requestHeader = mock.getArgument(1); PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt())); @@ -131,7 +134,8 @@ public class DefaultMQPullConsumerTest { MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0); pullConsumer.pull(messageQueue, "*", 1024, 3, new PullCallback() { - @Override public void onSuccess(PullResult pullResult) { + @Override + public void onSuccess(PullResult pullResult) { assertThat(pullResult).isNotNull(); assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1); @@ -140,13 +144,15 @@ public class DefaultMQPullConsumerTest { assertThat(pullResult.getMsgFoundList()).isEqualTo(new ArrayList<Object>()); } - @Override public void onException(Throwable e) { + @Override + public void onException(Throwable e) { } }); } - private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, List<MessageExt> messageExtList) throws Exception { + private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, + List<MessageExt> messageExtList) throws Exception { return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, new byte[] {}); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index cdf1d78..7e69cc1 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -90,7 +90,8 @@ public class DefaultMQPushConsumerTest { pushConsumer.setPullInterval(60 * 1000); pushConsumer.registerMessageListener(new MessageListenerConcurrently() { - @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return null; } @@ -109,7 +110,6 @@ public class DefaultMQPushConsumerTest { field.setAccessible(true); field.set(pushConsumerImpl, mQClientFactory); - field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); field.setAccessible(true); field.set(mQClientFactory, mQClientAPIImpl); @@ -125,21 +125,22 @@ public class DefaultMQPushConsumerTest { when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) .thenAnswer(new Answer<Object>() { - @Override public Object answer(InvocationOnMock mock) throws Throwable { - PullMessageRequestHeader requestHeader = mock.getArgument(1); - MessageClientExt messageClientExt = new MessageClientExt(); - messageClientExt.setTopic(topic); - messageClientExt.setQueueId(0); - messageClientExt.setMsgId("123"); - messageClientExt.setBody(new byte[] {'a'}); - messageClientExt.setOffsetMsgId("234"); - messageClientExt.setBornHost(new InetSocketAddress(8080)); - messageClientExt.setStoreHost(new InetSocketAddress(8080)); - PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt)); - ((PullCallback)mock.getArgument(4)).onSuccess(pullResult); - return pullResult; - } - }); + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + PullMessageRequestHeader requestHeader = mock.getArgument(1); + MessageClientExt messageClientExt = new MessageClientExt(); + messageClientExt.setTopic(topic); + messageClientExt.setQueueId(0); + messageClientExt.setMsgId("123"); + messageClientExt.setBody(new byte[] {'a'}); + messageClientExt.setOffsetMsgId("234"); + messageClientExt.setBornHost(new InetSocketAddress(8080)); + messageClientExt.setStoreHost(new InetSocketAddress(8080)); + PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt)); + ((PullCallback) mock.getArgument(4)).onSuccess(pullResult); + return pullResult; + } + }); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString()); @@ -159,7 +160,8 @@ public class DefaultMQPushConsumerTest { final CountDownLatch countDownLatch = new CountDownLatch(1); final MessageExt[] messageExts = new MessageExt[1]; pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { - @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { messageExts[0] = msgs.get(0); countDownLatch.countDown(); @@ -217,7 +219,8 @@ public class DefaultMQPushConsumerTest { return pullRequest; } - private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, List<MessageExt> messageExtList) throws Exception { + private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, + List<MessageExt> messageExtList) throws Exception { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); for (MessageExt messageExt : messageExtList) { outputStream.write(MessageDecoder.encode(messageExt, false)); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java index e9e5db7..8d091e5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java @@ -29,7 +29,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; - public class AllocateMessageQueueConsitentHashTest { private String topic; @@ -40,8 +39,6 @@ public class AllocateMessageQueueConsitentHashTest { topic = "topic_test"; } - - public void printMessageQueue(List<MessageQueue> messageQueueList, String name) { if (messageQueueList == null || messageQueueList.size() < 1) return; @@ -85,28 +82,27 @@ public class AllocateMessageQueueConsitentHashTest { @Test public void testAllocate1() { - testAllocate(20,10); + testAllocate(20, 10); } @Test public void testAllocate2() { - testAllocate(10,20); + testAllocate(10, 20); } - @Test - public void testRun100RandomCase(){ - for(int i=0;i<100;i++){ - int consumerSize = new Random().nextInt(200)+1;//1-200 - int queueSize = new Random().nextInt(100)+1;//1-100 - testAllocate(queueSize,consumerSize); + public void testRun100RandomCase() { + for (int i = 0; i < 100; i++) { + int consumerSize = new Random().nextInt(200) + 1;//1-200 + int queueSize = new Random().nextInt(100) + 1;//1-100 + testAllocate(queueSize, consumerSize); try { Thread.sleep(1); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + } } } - public void testAllocate(int queueSize, int consumerSize) { AllocateMessageQueueStrategy allocateMessageQueueConsistentHash = new AllocateMessageQueueConsistentHash(3); @@ -133,7 +129,7 @@ public class AllocateMessageQueueConsitentHashTest { } Assert.assertTrue( - verifyAllocateAll(cidBegin,mqAll, allocatedResAll)); + verifyAllocateAll(cidBegin, mqAll, allocatedResAll)); } Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<MessageQueue, String>(); @@ -162,7 +158,7 @@ public class AllocateMessageQueueConsitentHashTest { //System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString()); } - Assert.assertTrue("queueSize"+queueSize+"consumerSize:"+consumerSize+"\nmqAll:"+mqAll+"\nallocatedResAllAfterRemove"+allocatedResAllAfterRemove, + Assert.assertTrue("queueSize" + queueSize + "consumerSize:" + consumerSize + "\nmqAll:" + mqAll + "\nallocatedResAllAfterRemove" + allocatedResAllAfterRemove, verifyAllocateAll(cidAfterRemoveOne, mqAll, allocatedResAllAfterRemove)); verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID); } @@ -170,7 +166,7 @@ public class AllocateMessageQueueConsitentHashTest { List<String> cidAfterAdd = new ArrayList<String>(cidAfterRemoveOne); //test allocate add one more cid { - String newCid = CID_PREFIX+"NEW"; + String newCid = CID_PREFIX + "NEW"; //System.out.println("add one more cid "+newCid); cidAfterAdd.add(newCid); List<MessageQueue> mqShouldOnlyChanged = new ArrayList<MessageQueue>(); @@ -182,7 +178,7 @@ public class AllocateMessageQueueConsitentHashTest { allocatedResAllAfterAdd.addAll(rs); for (MessageQueue mq : rs) { allocateToAll3.put(mq, cid); - if (cid.equals(newCid)){ + if (cid.equals(newCid)) { mqShouldOnlyChanged.add(mq); } } @@ -190,19 +186,21 @@ public class AllocateMessageQueueConsitentHashTest { } Assert.assertTrue( - verifyAllocateAll(cidAfterAdd,mqAll, allocatedResAllAfterAdd)); + verifyAllocateAll(cidAfterAdd, mqAll, allocatedResAllAfterAdd)); verifyAfterAdd(allocateToAllAfterRemoveOne, allocateToAll3, newCid); } } - private boolean verifyAllocateAll(List<String> cidAll,List<MessageQueue> mqAll, List<MessageQueue> allocatedResAll) { - if (cidAll.isEmpty()){ + private boolean verifyAllocateAll(List<String> cidAll, List<MessageQueue> mqAll, + List<MessageQueue> allocatedResAll) { + if (cidAll.isEmpty()) { return allocatedResAll.isEmpty(); } return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll); } - private void verifyAfterRemove(Map<MessageQueue, String> allocateToBefore, Map<MessageQueue, String> allocateAfter, String removeCID) { + private void verifyAfterRemove(Map<MessageQueue, String> allocateToBefore, Map<MessageQueue, String> allocateAfter, + String removeCID) { for (MessageQueue mq : allocateToBefore.keySet()) { String allocateToOrigin = allocateToBefore.get(mq); if (allocateToOrigin.equals(removeCID)) { @@ -213,14 +211,15 @@ public class AllocateMessageQueueConsitentHashTest { } } - private void verifyAfterAdd(Map<MessageQueue, String> allocateBefore, Map<MessageQueue, String> allocateAfter, String newCID) { + private void verifyAfterAdd(Map<MessageQueue, String> allocateBefore, Map<MessageQueue, String> allocateAfter, + String newCID) { for (MessageQueue mq : allocateAfter.keySet()) { String allocateToOrigin = allocateBefore.get(mq); String allocateToAfter = allocateAfter.get(mq); if (allocateToAfter.equals(newCID)) { } else {//the rest queue should be the same - Assert.assertTrue("it was allocated to "+allocateToOrigin+". Now, it is to "+allocateAfter.get(mq)+" mq:"+mq,allocateAfter.get(mq).equals(allocateToOrigin));//should be the same + Assert.assertTrue("it was allocated to " + allocateToOrigin + ". Now, it is to " + allocateAfter.get(mq) + " mq:" + mq, allocateAfter.get(mq).equals(allocateToOrigin));//should be the same } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java index 81c394c..64d64f2 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java @@ -99,7 +99,8 @@ public class RemoteBrokerOffsetStoreTest { final MessageQueue messageQueue = new MessageQueue(topic, brokerName, 3); doAnswer(new Answer() { - @Override public Object answer(InvocationOnMock mock) throws Throwable { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { UpdateConsumerOffsetRequestHeader updateRequestHeader = mock.getArgument(1); when(mqClientAPI.queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong())).thenReturn(updateRequestHeader.getCommitOffset()); return null; @@ -123,8 +124,6 @@ public class RemoteBrokerOffsetStoreTest { assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025); } - - @Test public void testRemoveOffset() throws Exception { OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 3553738..bf01961 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -107,7 +107,8 @@ public class MQClientAPIImplTest { @Test public void testSendMessageSync_Success() throws InterruptedException, RemotingException, MQBrokerException { doAnswer(new Answer() { - @Override public Object answer(InvocationOnMock mock) throws Throwable { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { RemotingCommand request = mock.getArgument(1); return createSuccessResponse(request); } @@ -127,7 +128,8 @@ public class MQClientAPIImplTest { @Test public void testSendMessageSync_WithException() throws InterruptedException, RemotingException, MQBrokerException { doAnswer(new Answer() { - @Override public Object answer(InvocationOnMock mock) throws Throwable { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { RemotingCommand request = mock.getArgument(1); RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); response.setCode(ResponseCode.SYSTEM_ERROR); @@ -156,7 +158,8 @@ public class MQClientAPIImplTest { assertThat(sendResult).isNull(); doAnswer(new Answer() { - @Override public Object answer(InvocationOnMock mock) throws Throwable { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { InvokeCallback callback = mock.getArgument(3); RemotingCommand request = mock.getArgument(1); ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(), 3 * 1000, null, null); @@ -169,14 +172,16 @@ public class MQClientAPIImplTest { sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer())); mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC, new SendCallback() { - @Override public void onSuccess(SendResult sendResult) { + @Override + public void onSuccess(SendResult sendResult) { assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); assertThat(sendResult.getQueueOffset()).isEqualTo(123L); assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1); } - @Override public void onException(Throwable e) { + @Override + public void onException(Throwable e) { } }, null, null, 0, sendMessageContext, defaultMQProducerImpl); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java index 7e0b4f9..171a95a 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java @@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock; @RunWith(MockitoJUnitRunner.class) public class MQClientInstanceTest { - private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); private String topic = "FooBar"; private String group = "FooBarGroup";