http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java index 0ff589d..38d885e 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java @@ -6,13 +6,15 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagew...@apache.org $ */ /** @@ -36,10 +38,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; - +import static org.junit.Assert.assertEquals; -public class SendMessageTest extends BrokerTestHarness{ +public class SendMessageTest extends BrokerTestHarness { MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig()); String topic = "UnitTestTopic"; @@ -60,7 +61,7 @@ public class SendMessageTest extends BrokerTestHarness{ } @Test - public void testSendSingle() throws Exception{ + public void testSendSingle() throws Exception { Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes()); SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup("abc"); @@ -74,7 +75,7 @@ public class SendMessageTest extends BrokerTestHarness{ requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5, - CommunicationMode.SYNC, new SendMessageContext(), null); + CommunicationMode.SYNC, new SendMessageContext(), null); assertEquals(result.getSendStatus(), SendStatus.SEND_OK); } }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java index d6be5fb..89813fc 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java @@ -6,13 +6,15 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z vintagew...@apache.org $ */ /** @@ -25,7 +27,6 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; - public class ConsumerOffsetManagerTest extends BrokerTestHarness { @Test http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java index ab9ab6f..2f85dbc 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java @@ -6,13 +6,15 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z vintagew...@apache.org $ */ /** @@ -25,8 +27,9 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.junit.Test; -import static org.junit.Assert.*; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; public class TopicConfigManagerTest extends BrokerTestHarness { @Test http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index e4a8c36..0d22d7d 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -1,7 +1,7 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE module PUBLIC - "-//Puppy Crawl//DTD Check Configuration 1.3//EN" - "http://www.puppycrawl.com/dtds/configuration_1_3.dtd"> + "-//Puppy Crawl//DTD Check Configuration 1.3//EN" + "http://www.puppycrawl.com/dtds/configuration_1_3.dtd"> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with @@ -32,8 +32,8 @@ </module> <module name="RegexpSingleline"> - <property name="format" value="System\.out\.println" /> - <property name="message" value="Prohibit invoking System.out.println in source code !" /> + <property name="format" value="System\.out\.println"/> + <property name="message" value="Prohibit invoking System.out.println in source code !"/> </module> <module name="RegexpSingleline"> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/pom.xml ---------------------------------------------------------------------- diff --git a/client/pom.xml b/client/pom.xml index 86d38cf..9c18ebd 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -15,7 +15,7 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.rocketmq</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 8afca13..9c7a0cd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client; @@ -20,7 +20,6 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.remoting.common.RemotingUtil; - /** * Client Common configuration * @@ -123,78 +122,64 @@ public class ClientConfig { return clientCallbackExecutorThreads; } - public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) { this.clientCallbackExecutorThreads = clientCallbackExecutorThreads; } - public int getPollNameServerInteval() { return pollNameServerInteval; } - public void setPollNameServerInteval(int pollNameServerInteval) { this.pollNameServerInteval = pollNameServerInteval; } - public int getHeartbeatBrokerInterval() { return heartbeatBrokerInterval; } - public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) { this.heartbeatBrokerInterval = heartbeatBrokerInterval; } - public int getPersistConsumerOffsetInterval() { return persistConsumerOffsetInterval; } - public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) { this.persistConsumerOffsetInterval = persistConsumerOffsetInterval; } - public String getUnitName() { return unitName; } - public void setUnitName(String unitName) { this.unitName = unitName; } - public boolean isUnitMode() { return unitMode; } - public void setUnitMode(boolean unitMode) { this.unitMode = unitMode; } - public boolean isVipChannelEnabled() { return vipChannelEnabled; } - public void setVipChannelEnabled(final boolean vipChannelEnabled) { this.vipChannelEnabled = vipChannelEnabled; } - @Override public String toString() { return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName - + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval - + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" - + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" - + vipChannelEnabled + "]"; + + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval + + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" + + vipChannelEnabled + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java index 6596855..7697520 100644 --- a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java +++ b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client; @@ -22,7 +22,6 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; - /** * Base interface for MQ management * @@ -41,8 +40,7 @@ public interface MQAdmin { * @throws MQClientException */ void createTopic(final String key, final String newTopic, final int queueNum) - throws MQClientException; - + throws MQClientException; /** * Creates an topic @@ -59,8 +57,7 @@ public interface MQAdmin { * @throws MQClientException */ void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) - throws MQClientException; - + throws MQClientException; /** * Gets the message queue offset according to some time in milliseconds<br> @@ -77,7 +74,6 @@ public interface MQAdmin { */ long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException; - /** * Gets the max offset * @@ -90,7 +86,6 @@ public interface MQAdmin { */ long maxOffset(final MessageQueue mq) throws MQClientException; - /** * Gets the minimum offset * @@ -103,7 +98,6 @@ public interface MQAdmin { */ long minOffset(final MessageQueue mq) throws MQClientException; - /** * Gets the earliest stored message time * @@ -116,7 +110,6 @@ public interface MQAdmin { */ long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException; - /** * Query message according tto message id * @@ -131,8 +124,7 @@ public interface MQAdmin { * @throws MQClientException */ MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; - + InterruptedException, MQClientException; /** * Query messages @@ -154,8 +146,8 @@ public interface MQAdmin { * @throws InterruptedException */ QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, - final long end) throws MQClientException, InterruptedException; - + final long end) throws MQClientException, InterruptedException; + /** * @param topic @@ -166,7 +158,6 @@ public interface MQAdmin { * @throws InterruptedException * @throws MQClientException */ - MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/MQHelper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/MQHelper.java b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java index b4ddb08..937e846 100644 --- a/client/src/main/java/org/apache/rocketmq/client/MQHelper.java +++ b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java @@ -16,47 +16,39 @@ */ package org.apache.rocketmq.client; +import java.util.Set; +import java.util.TreeSet; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.slf4j.Logger; -import java.util.Set; -import java.util.TreeSet; - - public class MQHelper { public static void resetOffsetByTimestamp( - final MessageModel messageModel, - final String consumerGroup, - final String topic, - final long timestamp) throws Exception { + final MessageModel messageModel, + final String consumerGroup, + final String topic, + final long timestamp) throws Exception { resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, timestamp); } /** * Reset consumer topic offset according to time * - * @param messageModel - * which model - * @param instanceName - * which instance - * @param consumerGroup - * consumer group - * @param topic - * topic - * @param timestamp - * time - * + * @param messageModel which model + * @param instanceName which instance + * @param consumerGroup consumer group + * @param topic topic + * @param timestamp time * @throws Exception */ public static void resetOffsetByTimestamp( - final MessageModel messageModel, - final String instanceName, - final String consumerGroup, - final String topic, - final long timestamp) throws Exception { + final MessageModel messageModel, + final String instanceName, + final String consumerGroup, + final String topic, + final long timestamp) throws Exception { final Logger log = ClientLogger.getLog(); DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup); @@ -74,7 +66,7 @@ public class MQHelper { if (offset >= 0) { consumer.updateConsumeOffset(mq, offset); log.info("resetOffsetByTimestamp updateConsumeOffset success, {} {} {}", - consumerGroup, offset, mq); + consumerGroup, offset, mq); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/QueryResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/QueryResult.java b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java index af3649b..7b1cc01 100644 --- a/client/src/main/java/org/apache/rocketmq/client/QueryResult.java +++ b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java @@ -6,45 +6,39 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client; -import org.apache.rocketmq.common.message.MessageExt; - import java.util.List; - +import org.apache.rocketmq.common.message.MessageExt; public class QueryResult { private final long indexLastUpdateTimestamp; private final List<MessageExt> messageList; - public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> messageList) { this.indexLastUpdateTimestamp = indexLastUpdateTimestamp; this.messageList = messageList; } - public long getIndexLastUpdateTimestamp() { return indexLastUpdateTimestamp; } - public List<MessageExt> getMessageList() { return messageList; } - @Override public String toString() { return "QueryResult [indexLastUpdateTimestamp=" + indexLastUpdateTimestamp + ", messageList=" - + messageList + "]"; + + messageList + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/Validators.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java index 92fc53b..fa9e4e6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/Validators.java +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -6,17 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.MixAll; @@ -24,10 +26,6 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.protocol.ResponseCode; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - - /** * Common Validator * @@ -65,8 +63,8 @@ public class Validators { } if (!regularExpressionMatcher(group, PATTERN)) { throw new MQClientException(String.format( - "the specified group[%s] contains illegal characters, allowing only %s", group, - VALID_PATTERN_STR), null); + "the specified group[%s] contains illegal characters, allowing only %s", group, + VALID_PATTERN_STR), null); } if (group.length() > CHARACTER_MAX_LENGTH) { throw new MQClientException("the specified group is longer than group max length 255.", null); @@ -97,7 +95,7 @@ public class Validators { * @throws MQClientException */ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) - throws MQClientException { + throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } @@ -114,7 +112,7 @@ public class Validators { if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, - "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); + "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } } @@ -132,8 +130,8 @@ public class Validators { if (!regularExpressionMatcher(topic, PATTERN)) { throw new MQClientException(String.format( - "the specified topic[%s] contains illegal characters, allowing only %s", topic, - VALID_PATTERN_STR), null); + "the specified topic[%s] contains illegal characters, allowing only %s", topic, + VALID_PATTERN_STR), null); } if (topic.length() > CHARACTER_MAX_LENGTH) { @@ -143,7 +141,7 @@ public class Validators { //whether the same with system reserved keyword if (topic.equals(MixAll.DEFAULT_TOPIC)) { throw new MQClientException( - String.format("the topic[%s] is conflict with default topic.", topic), null); + String.format("the topic[%s] is conflict with default topic.", topic), null); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java index bc4ca6c..913d4f2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java +++ b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.admin; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java index 360cfdf..391f1d1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java +++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.common; @@ -22,6 +22,7 @@ import java.util.Random; public class ThreadLocalIndex { private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>(); private final Random random = new Random(); + public ThreadLocalIndex(int value) { } @@ -30,7 +31,8 @@ public class ThreadLocalIndex { Integer index = this.threadLocalIndex.get(); if (null == index) { index = Math.abs(random.nextInt()); - if (index < 0) index = 0; + if (index < 0) + index = 0; this.threadLocalIndex.set(index); } @@ -45,7 +47,7 @@ public class ThreadLocalIndex { @Override public String toString() { return "ThreadLocalIndex{" + - "threadLocalIndex=" + threadLocalIndex.get() + - '}'; + "threadLocalIndex=" + threadLocalIndex.get() + + '}'; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java index 81a71e4..ca692d3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java @@ -16,39 +16,30 @@ */ package org.apache.rocketmq.client.consumer; -import org.apache.rocketmq.common.message.MessageQueue; - import java.util.List; - +import org.apache.rocketmq.common.message.MessageQueue; /** * Strategy Algorithm for message allocating between consumers - * */ public interface AllocateMessageQueueStrategy { /** * Allocating by consumer id * - * @param consumerGroup - * current consumer group - * @param currentCID - * current consumer id - * @param mqAll - * message queue set in current topic - * @param cidAll - * consumer set in current consumer group - * + * @param consumerGroup current consumer group + * @param currentCID current consumer id + * @param mqAll message queue set in current topic + * @param cidAll consumer set in current consumer group * @return The allocate result of given strategy */ List<MessageQueue> allocate( - final String consumerGroup, - final String currentCID, - final List<MessageQueue> mqAll, - final List<String> cidAll + final String consumerGroup, + final String currentCID, + final List<MessageQueue> mqAll, + final List<String> cidAll ); - /** * Algorithm name * http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index 156b3d0..8eb1258 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -6,16 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer; +import java.util.HashSet; +import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; @@ -31,10 +33,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; -import java.util.HashSet; -import java.util.Set; - - /** * Default pulling consumer * @@ -88,23 +86,19 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume private int maxReconsumeTimes = 16; - public DefaultMQPullConsumer() { this(MixAll.DEFAULT_CONSUMER_GROUP, null); } - public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) { this.consumerGroup = consumerGroup; defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook); } - public DefaultMQPullConsumer(final String consumerGroup) { this(consumerGroup, null); } - public DefaultMQPullConsumer(RPCHook rpcHook) { this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); } @@ -114,141 +108,116 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume createTopic(key, newTopic, queueNum, 0); } - @Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); } - @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp); } - @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPullConsumerImpl.maxOffset(mq); } - @Override public long minOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPullConsumerImpl.minOffset(mq); } - @Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq); } - @Override public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException { + InterruptedException, MQClientException { return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId); } - @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { + throws MQClientException, InterruptedException { return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, begin, end); } - public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { return allocateMessageQueueStrategy; } - public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; } - public long getBrokerSuspendMaxTimeMillis() { return brokerSuspendMaxTimeMillis; } - public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) { this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis; } - public String getConsumerGroup() { return consumerGroup; } - public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } - public long getConsumerPullTimeoutMillis() { return consumerPullTimeoutMillis; } - public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) { this.consumerPullTimeoutMillis = consumerPullTimeoutMillis; } - public long getConsumerTimeoutMillisWhenSuspend() { return consumerTimeoutMillisWhenSuspend; } - public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) { this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend; } - public MessageModel getMessageModel() { return messageModel; } - public void setMessageModel(MessageModel messageModel) { this.messageModel = messageModel; } - public MessageQueueListener getMessageQueueListener() { return messageQueueListener; } - public void setMessageQueueListener(MessageQueueListener messageQueueListener) { this.messageQueueListener = messageQueueListener; } - public Set<String> getRegisterTopics() { return registerTopics; } - public void setRegisterTopics(Set<String> registerTopics) { this.registerTopics = registerTopics; } - @Override public void sendMessageBack(MessageExt msg, int delayLevel) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null); } - @Override public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName); } @@ -279,37 +248,37 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Override public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums); } @Override public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout); } @Override public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback); } @Override public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout); } @Override public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums); } @Override public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback); } @@ -341,7 +310,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Override public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup); } @@ -349,32 +318,26 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume return offsetStore; } - public void setOffsetStore(OffsetStore offsetStore) { this.offsetStore = offsetStore; } - public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() { return defaultMQPullConsumerImpl; } - public boolean isUnitMode() { return unitMode; } - public void setUnitMode(boolean isUnitMode) { this.unitMode = isUnitMode; } - public int getMaxReconsumeTimes() { return maxReconsumeTimes; } - public void setMaxReconsumeTimes(final int maxReconsumeTimes) { this.maxReconsumeTimes = maxReconsumeTimes; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 228e075..fcb3e64 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -16,6 +16,9 @@ */ package org.apache.rocketmq.client.consumer; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.consumer.listener.MessageListener; @@ -36,14 +39,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - - /** * Wrapped push consumer.in fact,it works as remarkable as the pull consumer - * */ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; @@ -133,24 +130,20 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume private long suspendCurrentQueueTimeMillis = 1000; private long consumeTimeout = 15; - public DefaultMQPushConsumer() { this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); } - public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); } - public DefaultMQPushConsumer(RPCHook rpcHook) { this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); } - public DefaultMQPushConsumer(final String consumerGroup) { this(consumerGroup, null, new AllocateMessageQueueAveragely()); } @@ -160,46 +153,39 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume createTopic(key, newTopic, queueNum, 0); } - @Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); } - @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp); } - @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPushConsumerImpl.maxOffset(mq); } - @Override public long minOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPushConsumerImpl.minOffset(mq); } - @Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq); } - @Override public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId); } - @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { + throws MQClientException, InterruptedException { return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end); } @@ -218,169 +204,137 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume return allocateMessageQueueStrategy; } - public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; } - public int getConsumeConcurrentlyMaxSpan() { return consumeConcurrentlyMaxSpan; } - public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) { this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan; } - public ConsumeFromWhere getConsumeFromWhere() { return consumeFromWhere; } - public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { this.consumeFromWhere = consumeFromWhere; } - public int getConsumeMessageBatchMaxSize() { return consumeMessageBatchMaxSize; } - public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) { this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; } - public String getConsumerGroup() { return consumerGroup; } - public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } - public int getConsumeThreadMax() { return consumeThreadMax; } - public void setConsumeThreadMax(int consumeThreadMax) { this.consumeThreadMax = consumeThreadMax; } - public int getConsumeThreadMin() { return consumeThreadMin; } - public void setConsumeThreadMin(int consumeThreadMin) { this.consumeThreadMin = consumeThreadMin; } - public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() { return defaultMQPushConsumerImpl; } - public MessageListener getMessageListener() { return messageListener; } - public void setMessageListener(MessageListener messageListener) { this.messageListener = messageListener; } - public MessageModel getMessageModel() { return messageModel; } - public void setMessageModel(MessageModel messageModel) { this.messageModel = messageModel; } - public int getPullBatchSize() { return pullBatchSize; } - public void setPullBatchSize(int pullBatchSize) { this.pullBatchSize = pullBatchSize; } - public long getPullInterval() { return pullInterval; } - public void setPullInterval(long pullInterval) { this.pullInterval = pullInterval; } - public int getPullThresholdForQueue() { return pullThresholdForQueue; } - public void setPullThresholdForQueue(int pullThresholdForQueue) { this.pullThresholdForQueue = pullThresholdForQueue; } - public Map<String, String> getSubscription() { return subscription; } - public void setSubscription(Map<String, String> subscription) { this.subscription = subscription; } - @Override public void sendMessageBack(MessageExt msg, int delayLevel) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null); } - @Override public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName); } - @Override public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic); } - @Override public void start() throws MQClientException { this.defaultMQPushConsumerImpl.start(); } - @Override public void shutdown() { this.defaultMQPushConsumerImpl.shutdown(); } - @Override @Deprecated public void registerMessageListener(MessageListener messageListener) { @@ -388,127 +342,104 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); } - @Override public void registerMessageListener(MessageListenerConcurrently messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); } - @Override public void registerMessageListener(MessageListenerOrderly messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); } - @Override public void subscribe(String topic, String subExpression) throws MQClientException { this.defaultMQPushConsumerImpl.subscribe(topic, subExpression); } - @Override public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource); } - @Override public void unsubscribe(String topic) { this.defaultMQPushConsumerImpl.unsubscribe(topic); } - @Override public void updateCorePoolSize(int corePoolSize) { this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize); } - @Override public void suspend() { this.defaultMQPushConsumerImpl.suspend(); } - @Override public void resume() { this.defaultMQPushConsumerImpl.resume(); } - public OffsetStore getOffsetStore() { return offsetStore; } - public void setOffsetStore(OffsetStore offsetStore) { this.offsetStore = offsetStore; } - public String getConsumeTimestamp() { return consumeTimestamp; } - public void setConsumeTimestamp(String consumeTimestamp) { this.consumeTimestamp = consumeTimestamp; } - public boolean isPostSubscriptionWhenPull() { return postSubscriptionWhenPull; } - public void setPostSubscriptionWhenPull(boolean postSubscriptionWhenPull) { this.postSubscriptionWhenPull = postSubscriptionWhenPull; } - public boolean isUnitMode() { return unitMode; } - public void setUnitMode(boolean isUnitMode) { this.unitMode = isUnitMode; } - public long getAdjustThreadPoolNumsThreshold() { return adjustThreadPoolNumsThreshold; } - public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) { this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold; } - public int getMaxReconsumeTimes() { return maxReconsumeTimes; } - public void setMaxReconsumeTimes(final int maxReconsumeTimes) { this.maxReconsumeTimes = maxReconsumeTimes; } - public long getSuspendCurrentQueueTimeMillis() { return suspendCurrentQueueTimeMillis; } - public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) { this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; } - public long getConsumeTimeout() { return consumeTimeout; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java index 9d9c72b..343a0a2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java @@ -6,16 +6,17 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer; +import java.util.Set; import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -23,9 +24,6 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; -import java.util.Set; - - /** * Message queue consumer interface * @@ -44,8 +42,7 @@ public interface MQConsumer extends MQAdmin { */ @Deprecated void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException; - + MQBrokerException, InterruptedException, MQClientException; /** * If consuming failure,message will be send back to the broker,and delay consuming some time @@ -60,8 +57,7 @@ public interface MQConsumer extends MQAdmin { * @throws MQClientException */ void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - + throws RemotingException, MQBrokerException, InterruptedException, MQClientException; /** * Fetch message queues from consumer cache according to the topic http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java index 2335e3d..d199f8a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java @@ -6,25 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer; +import java.util.Set; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; -import java.util.Set; - - /** * Pulling consumer interface * @@ -37,13 +35,11 @@ public interface MQPullConsumer extends MQConsumer { */ void start() throws MQClientException; - /** * Shutdown the consumer */ void shutdown(); - /** * Register the message queue listener * @@ -52,7 +48,6 @@ public interface MQPullConsumer extends MQConsumer { */ void registerMessageQueueListener(final String topic, final MessageQueueListener listener); - /** * Pulling the messages,not blocking * @@ -74,9 +69,8 @@ public interface MQPullConsumer extends MQConsumer { * @throws RemotingException */ PullResult pull(final MessageQueue mq, final String subExpression, final long offset, - final int maxNums) throws MQClientException, RemotingException, MQBrokerException, - InterruptedException; - + final int maxNums) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; /** * Pulling the messages in the specified timeout @@ -95,9 +89,8 @@ public interface MQPullConsumer extends MQConsumer { * @throws InterruptedException */ PullResult pull(final MessageQueue mq, final String subExpression, final long offset, - final int maxNums, final long timeout) throws MQClientException, RemotingException, - MQBrokerException, InterruptedException; - + final int maxNums, final long timeout) throws MQClientException, RemotingException, + MQBrokerException, InterruptedException; /** * Pulling the messages in a async. way @@ -113,8 +106,8 @@ public interface MQPullConsumer extends MQConsumer { * @throws InterruptedException */ void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums, - final PullCallback pullCallback) throws MQClientException, RemotingException, - InterruptedException; + final PullCallback pullCallback) throws MQClientException, RemotingException, + InterruptedException; /** * Pulling the messages in a async. way @@ -131,9 +124,8 @@ public interface MQPullConsumer extends MQConsumer { * @throws InterruptedException */ void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums, - final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, - InterruptedException; - + final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, + InterruptedException; /** * Pulling the messages,if no message arrival,blocking some time @@ -151,9 +143,8 @@ public interface MQPullConsumer extends MQConsumer { * @throws InterruptedException */ PullResult pullBlockIfNotFound(final MessageQueue mq, final String subExpression, - final long offset, final int maxNums) throws MQClientException, RemotingException, - MQBrokerException, InterruptedException; - + final long offset, final int maxNums) throws MQClientException, RemotingException, + MQBrokerException, InterruptedException; /** * Pulling the messages through callback function,if no message arrival,blocking. @@ -169,9 +160,8 @@ public interface MQPullConsumer extends MQConsumer { * @throws InterruptedException */ void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset, - final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException, - InterruptedException; - + final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException, + InterruptedException; /** * Update the offset @@ -183,7 +173,6 @@ public interface MQPullConsumer extends MQConsumer { */ void updateConsumeOffset(final MessageQueue mq, final long offset) throws MQClientException; - /** * Fetch the offset * @@ -196,7 +185,6 @@ public interface MQPullConsumer extends MQConsumer { */ long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) throws MQClientException; - /** * Fetch the message queues according to the topic * @@ -224,5 +212,5 @@ public interface MQPullConsumer extends MQConsumer { * @throws MQClientException */ void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + throws RemotingException, MQBrokerException, InterruptedException, MQClientException; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java index da8ffb5..ec747e2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java @@ -16,34 +16,31 @@ */ package org.apache.rocketmq.client.consumer; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.slf4j.Logger; - import java.util.Iterator; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; - +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.slf4j.Logger; /** * Schedule service for pull consumer - * */ public class MQPullConsumerScheduleService { private final Logger log = ClientLogger.getLog(); private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl(); private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable = - new ConcurrentHashMap<MessageQueue, PullTaskImpl>(); + new ConcurrentHashMap<MessageQueue, PullTaskImpl>(); private DefaultMQPullConsumer defaultMQPullConsumer; private int pullThreadNums = 20; private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable = - new ConcurrentHashMap<String, PullTaskCallback>(); + new ConcurrentHashMap<String, PullTaskCallback>(); private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; public MQPullConsumerScheduleService(final String consumerGroup) { @@ -76,8 +73,8 @@ public class MQPullConsumerScheduleService { public void start() throws MQClientException { final String group = this.defaultMQPullConsumer.getConsumerGroup(); this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( - this.pullThreadNums, - new ThreadFactoryImpl("PullMsgThread-" + group) + this.pullThreadNums, + new ThreadFactoryImpl("PullMsgThread-" + group) ); this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener); @@ -85,7 +82,7 @@ public class MQPullConsumerScheduleService { this.defaultMQPullConsumer.start(); log.info("MQPullConsumerScheduleService start OK, {} {}", - this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable); + this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable); } public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) { @@ -139,7 +136,7 @@ public class MQPullConsumerScheduleService { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { MessageModel messageModel = - MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel(); + MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel(); switch (messageModel) { case BROADCASTING: MQPullConsumerScheduleService.this.putTask(topic, mqAll); @@ -157,18 +154,16 @@ public class MQPullConsumerScheduleService { private final MessageQueue messageQueue; private volatile boolean cancelled = false; - public PullTaskImpl(final MessageQueue messageQueue) { this.messageQueue = messageQueue; } - @Override public void run() { String topic = this.messageQueue.getTopic(); if (!this.isCancelled()) { PullTaskCallback pullTaskCallback = - MQPullConsumerScheduleService.this.callbackTable.get(topic); + MQPullConsumerScheduleService.this.callbackTable.get(topic); if (pullTaskCallback != null) { final PullTaskContext context = new PullTaskContext(); context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer); @@ -181,7 +176,7 @@ public class MQPullConsumerScheduleService { if (!this.isCancelled()) { MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this, - context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS); + context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS); } else { log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue); } @@ -193,17 +188,14 @@ public class MQPullConsumerScheduleService { } } - public boolean isCancelled() { return cancelled; } - public void setCancelled(boolean cancelled) { this.cancelled = cancelled; } - public MessageQueue getMessageQueue() { return messageQueue; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java index b04956c..1b969bd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer; @@ -21,7 +21,6 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; - /** * Push consumer * @@ -34,13 +33,11 @@ public interface MQPushConsumer extends MQConsumer { */ void start() throws MQClientException; - /** * Shutdown the consumer */ void shutdown(); - /** * Register the message listener * @@ -49,13 +46,10 @@ public interface MQPushConsumer extends MQConsumer { @Deprecated void registerMessageListener(MessageListener messageListener); - void registerMessageListener(final MessageListenerConcurrently messageListener); - void registerMessageListener(final MessageListenerOrderly messageListener); - /** * Subscribe some topic * @@ -69,7 +63,6 @@ public interface MQPushConsumer extends MQConsumer { */ void subscribe(final String topic, final String subExpression) throws MQClientException; - /** * Subscribe some topic * @@ -85,7 +78,6 @@ public interface MQPushConsumer extends MQConsumer { */ void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException; - /** * Unsubscribe consumption some topic * @@ -94,7 +86,6 @@ public interface MQPushConsumer extends MQConsumer { */ void unsubscribe(final String topic); - /** * Update the consumer thread pool size Dynamically * @@ -102,13 +93,11 @@ public interface MQPushConsumer extends MQConsumer { */ void updateCorePoolSize(int corePoolSize); - /** * Suspend the consumption */ void suspend(); - /** * Resume the consumption */ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java index 7a08348..0cc2dc4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java @@ -6,20 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer; -import org.apache.rocketmq.common.message.MessageQueue; - import java.util.Set; - +import org.apache.rocketmq.common.message.MessageQueue; /** * A MessageQueueListener is implemented by the application and may be specified when a message queue changed @@ -35,5 +33,5 @@ public interface MessageQueueListener { * collection of queues,assigned to the current consumer */ void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, - final Set<MessageQueue> mqDivided); + final Set<MessageQueue> mqDivided); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java index cf554c4..06e47d9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java index 1cb23ce..e494f74 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java @@ -6,20 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer; -import org.apache.rocketmq.common.message.MessageExt; - import java.util.List; - +import org.apache.rocketmq.common.message.MessageExt; public class PullResult { private final PullStatus pullStatus; @@ -28,9 +26,8 @@ public class PullResult { private final long maxOffset; private List<MessageExt> msgFoundList; - public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, - List<MessageExt> msgFoundList) { + List<MessageExt> msgFoundList) { super(); this.pullStatus = pullStatus; this.nextBeginOffset = nextBeginOffset; @@ -39,41 +36,34 @@ public class PullResult { this.msgFoundList = msgFoundList; } - public PullStatus getPullStatus() { return pullStatus; } - public long getNextBeginOffset() { return nextBeginOffset; } - public long getMinOffset() { return minOffset; } - public long getMaxOffset() { return maxOffset; } - public List<MessageExt> getMsgFoundList() { return msgFoundList; } - public void setMsgFoundList(List<MessageExt> msgFoundList) { this.msgFoundList = msgFoundList; } - @Override public String toString() { return "PullResult [pullStatus=" + pullStatus + ", nextBeginOffset=" + nextBeginOffset - + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", msgFoundList=" - + (msgFoundList == null ? 0 : msgFoundList.size()) + "]"; + + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", msgFoundList=" + + (msgFoundList == null ? 0 : msgFoundList.size()) + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java index b2a3c8c..a400d90 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java index dc74bca..bc9a867 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer; import org.apache.rocketmq.common.message.MessageQueue; - public interface PullTaskCallback { void doPullTask(final MessageQueue mq, final PullTaskContext context); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java index ba66a1f..f0114ae 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer; @@ -22,22 +22,18 @@ public class PullTaskContext { private MQPullConsumer pullConsumer; - public int getPullNextDelayTimeMillis() { return pullNextDelayTimeMillis; } - public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) { this.pullNextDelayTimeMillis = pullNextDelayTimeMillis; } - public MQPullConsumer getPullConsumer() { return pullConsumer; } - public void setPullConsumer(MQPullConsumer pullConsumer) { this.pullConsumer = pullConsumer; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java index 981ceaf..40ac6c1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java @@ -18,10 +18,8 @@ package org.apache.rocketmq.client.consumer.listener; import org.apache.rocketmq.common.message.MessageQueue; - /** * Consumer concurrent consumption context - * */ public class ConsumeConcurrentlyContext { private final MessageQueue messageQueue; @@ -38,27 +36,22 @@ public class ConsumeConcurrentlyContext { this.messageQueue = messageQueue; } - public int getDelayLevelWhenNextConsume() { return delayLevelWhenNextConsume; } - public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) { this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; } - public MessageQueue getMessageQueue() { return messageQueue; } - public int getAckIndex() { return ackIndex; } - public void setAckIndex(int ackIndex) { this.ackIndex = ackIndex; }