http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java deleted file mode 100644 index 1c93b02..0000000 --- a/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ - */ -package com.alibaba.rocketmq.broker.topic; - -import com.alibaba.rocketmq.broker.BrokerTestHarness; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.TopicConfig; -import org.junit.Test; - -import static org.junit.Assert.*; - - -/** - * @author zander - */ -public class TopicConfigManagerTest extends BrokerTestHarness { - @Test - public void testFlushTopicConfig() throws Exception { - TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController); - - for (int i = 0; i < 10; i++) { - String topic = "UNITTEST-" + i; - TopicConfig topicConfig = topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0); - assertNotNull(topicConfig); - } - topicConfigManager.persist(); - - topicConfigManager.getTopicConfigTable().clear(); - - for (int i = 0; i < 10; i++) { - String topic = "UNITTEST-" + i; - TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic); - assertNull(topicConfig); - } - topicConfigManager.load(); - for (int i = 0; i < 10; i++) { - String topic = "UNITTEST-" + i; - TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic); - assertNotNull(topicConfig); - assertEquals(topicConfig.getTopicSysFlag(), 0); - assertEquals(topicConfig.getReadQueueNums(), 4); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java new file mode 100644 index 0000000..79f82a6 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker; + +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author shtykh_roman + */ +public class BrokerControllerTest { + protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class); + + private static final int RESTART_NUM = 3; + + /** + * Tests if the controller can be properly stopped and started. + * + * @throws Exception If fails. + */ + @Test + public void testRestart() throws Exception { + + for (int i = 0; i < RESTART_NUM; i++) { + BrokerController brokerController = new BrokerController(// + new BrokerConfig(), // + new NettyServerConfig(), // + new NettyClientConfig(), // + new MessageStoreConfig()); + boolean initResult = brokerController.initialize(); + Assert.assertTrue(initResult); + logger.info("Broker is initialized " + initResult); + brokerController.start(); + logger.info("Broker is started"); + + brokerController.shutdown(); + logger.info("Broker is stopped"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java new file mode 100644 index 0000000..4b4fd95 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package org.apache.rocketmq.broker; + +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Random; + +/** + * @author zander + */ +public class BrokerTestHarness { + + protected BrokerController brokerController = null; + + protected Random random = new Random(); + public final String BROKER_NAME = "TestBrokerName"; + protected String brokerAddr = ""; + protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class); + protected BrokerConfig brokerConfig = new BrokerConfig(); + protected NettyServerConfig nettyServerConfig = new NettyServerConfig(); + protected NettyClientConfig nettyClientConfig = new NettyClientConfig(); + protected MessageStoreConfig storeConfig = new MessageStoreConfig(); + + @Before + public void startup() throws Exception { + brokerConfig.setBrokerName(BROKER_NAME); + brokerConfig.setBrokerIP1("127.0.0.1"); + storeConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore"); + storeConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + "commitlog"); + nettyServerConfig.setListenPort(10000 + random.nextInt(1000)); + brokerAddr = brokerConfig.getBrokerIP1() + ":" + nettyServerConfig.getListenPort(); + brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); + boolean initResult = brokerController.initialize(); + Assert.assertTrue(initResult); + logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr()); + brokerController.start(); + } + + @After + public void shutdown() throws Exception { + if (brokerController != null) { + brokerController.shutdown(); + } + //maybe need to clean the file store. But we do not suggest deleting anything. + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java new file mode 100644 index 0000000..9988a7c --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package org.apache.rocketmq.broker.api; + +import org.apache.rocketmq.broker.BrokerTestHarness; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + + +/** + * @author zander + */ +public class SendMessageTest extends BrokerTestHarness{ + + MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig()); + String topic = "UnitTestTopic"; + + @Before + @Override + public void startup() throws Exception { + super.startup(); + client.start(); + + } + + @After + @Override + public void shutdown() throws Exception { + client.shutdown(); + super.shutdown(); + } + + @Test + public void testSendSingle() throws Exception{ + Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes()); + SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); + requestHeader.setProducerGroup("abc"); + requestHeader.setTopic(msg.getTopic()); + requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC); + requestHeader.setDefaultTopicQueueNums(4); + requestHeader.setQueueId(0); + requestHeader.setSysFlag(0); + requestHeader.setBornTimestamp(System.currentTimeMillis()); + requestHeader.setFlag(msg.getFlag()); + requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); + + SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5, + CommunicationMode.SYNC, new SendMessageContext(), null); + assertEquals(result.getSendStatus(), SendStatus.SEND_OK); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java new file mode 100644 index 0000000..cdbddf9 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package org.apache.rocketmq.broker.offset; + +import org.apache.rocketmq.broker.BrokerTestHarness; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +/** + * @author zander + */ +public class ConsumerOffsetManagerTest extends BrokerTestHarness { + + @Test + public void testFlushConsumerOffset() throws Exception { + ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController); + for (int i = 0; i < 10; i++) { + String group = "UNIT_TEST_GROUP_" + i; + for (int id = 0; id < 10; id++) { + consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id, id + 100); + consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id, id + 100); + consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id, id + 100); + } + } + consumerOffsetManager.persist(); + consumerOffsetManager.getOffsetTable().clear(); + for (int i = 0; i < 10; i++) { + String group = "UNIT_TEST_GROUP_" + i; + for (int id = 0; id < 10; id++) { + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), -1); + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1); + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1); + } + } + consumerOffsetManager.load(); + for (int i = 0; i < 10; i++) { + String group = "UNIT_TEST_GROUP_" + i; + for (int id = 0; id < 10; id++) { + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), id + 100); + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100); + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java new file mode 100644 index 0000000..1de17e6 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package org.apache.rocketmq.broker.topic; + +import org.apache.rocketmq.broker.BrokerTestHarness; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.junit.Test; + +import static org.junit.Assert.*; + + +/** + * @author zander + */ +public class TopicConfigManagerTest extends BrokerTestHarness { + @Test + public void testFlushTopicConfig() throws Exception { + TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController); + + for (int i = 0; i < 10; i++) { + String topic = "UNITTEST-" + i; + TopicConfig topicConfig = topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0); + assertNotNull(topicConfig); + } + topicConfigManager.persist(); + + topicConfigManager.getTopicConfigTable().clear(); + + for (int i = 0; i < 10; i++) { + String topic = "UNITTEST-" + i; + TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic); + assertNull(topicConfig); + } + topicConfigManager.load(); + for (int i = 0; i < 10; i++) { + String topic = "UNITTEST-" + i; + TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic); + assertNotNull(topicConfig); + assertEquals(topicConfig.getTopicSysFlag(), 0); + assertEquals(topicConfig.getReadQueueNums(), 4); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/pom.xml ---------------------------------------------------------------------- diff --git a/client/pom.xml b/client/pom.xml index 63a6114..86d38cf 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -18,7 +18,7 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <groupId>com.alibaba.rocketmq</groupId> + <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>4.0.0-SNAPSHOT</version> </parent> @@ -56,19 +56,19 @@ <includes> <include>com.alibaba:fastjson</include> <include>io.netty:netty-all</include> - <include>com.alibaba.rocketmq:rocketmq-client</include> - <include>com.alibaba.rocketmq:rocketmq-common</include> - <include>com.alibaba.rocketmq:rocketmq-remoting</include> + <include>org.apache.rocketmq:rocketmq-client</include> + <include>org.apache.rocketmq:rocketmq-common</include> + <include>org.apache.rocketmq:rocketmq-remoting</include> </includes> </artifactSet> <relocations> <relocation> <pattern>io.netty</pattern> - <shadedPattern>com.alibaba.rocketmq.shade.io.netty</shadedPattern> + <shadedPattern>org.apache.rocketmq.shade.io.netty</shadedPattern> </relocation> <relocation> <pattern>com.alibaba.fastjson</pattern> - <shadedPattern>com.alibaba.rocketmq.shade.com.alibaba.fastjson</shadedPattern> + <shadedPattern>org.apache.rocketmq.shade.com.alibaba.fastjson</shadedPattern> </relocation> </relocations> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java b/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java deleted file mode 100644 index 4d80564..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java +++ /dev/null @@ -1,202 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client; - -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.remoting.common.RemotingUtil; - - -/** - * Client Common configuration - * - * @author shijia.wxr - * @author vongosling - */ -public class ClientConfig { - public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel"; - private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); - private String clientIP = RemotingUtil.getLocalAddress(); - private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); - private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); - /** - * Pulling topic information interval from the named server - */ - private int pollNameServerInteval = 1000 * 30; - /** - * Heartbeat interval in microseconds with message broker - */ - private int heartbeatBrokerInterval = 1000 * 30; - /** - * Offset persistent interval for consumer - */ - private int persistConsumerOffsetInterval = 1000 * 5; - private boolean unitMode = false; - private String unitName; - private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); - - public String buildMQClientId() { - StringBuilder sb = new StringBuilder(); - sb.append(this.getClientIP()); - - sb.append("@"); - sb.append(this.getInstanceName()); - if (!UtilAll.isBlank(this.unitName)) { - sb.append("@"); - sb.append(this.unitName); - } - - return sb.toString(); - } - - public String getClientIP() { - return clientIP; - } - - public void setClientIP(String clientIP) { - this.clientIP = clientIP; - } - - public String getInstanceName() { - return instanceName; - } - - public void setInstanceName(String instanceName) { - this.instanceName = instanceName; - } - - public void changeInstanceNameToPID() { - if (this.instanceName.equals("DEFAULT")) { - this.instanceName = String.valueOf(UtilAll.getPid()); - } - } - - public void resetClientConfig(final ClientConfig cc) { - this.namesrvAddr = cc.namesrvAddr; - this.clientIP = cc.clientIP; - this.instanceName = cc.instanceName; - this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads; - this.pollNameServerInteval = cc.pollNameServerInteval; - this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval; - this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval; - this.unitMode = cc.unitMode; - this.unitName = cc.unitName; - this.vipChannelEnabled = cc.vipChannelEnabled; - } - - public ClientConfig cloneClientConfig() { - ClientConfig cc = new ClientConfig(); - cc.namesrvAddr = namesrvAddr; - cc.clientIP = clientIP; - cc.instanceName = instanceName; - cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads; - cc.pollNameServerInteval = pollNameServerInteval; - cc.heartbeatBrokerInterval = heartbeatBrokerInterval; - cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval; - cc.unitMode = unitMode; - cc.unitName = unitName; - cc.vipChannelEnabled = vipChannelEnabled; - return cc; - } - - public String getNamesrvAddr() { - return namesrvAddr; - } - - public void setNamesrvAddr(String namesrvAddr) { - this.namesrvAddr = namesrvAddr; - } - - public int getClientCallbackExecutorThreads() { - return clientCallbackExecutorThreads; - } - - - public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) { - this.clientCallbackExecutorThreads = clientCallbackExecutorThreads; - } - - - public int getPollNameServerInteval() { - return pollNameServerInteval; - } - - - public void setPollNameServerInteval(int pollNameServerInteval) { - this.pollNameServerInteval = pollNameServerInteval; - } - - - public int getHeartbeatBrokerInterval() { - return heartbeatBrokerInterval; - } - - - public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) { - this.heartbeatBrokerInterval = heartbeatBrokerInterval; - } - - - public int getPersistConsumerOffsetInterval() { - return persistConsumerOffsetInterval; - } - - - public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) { - this.persistConsumerOffsetInterval = persistConsumerOffsetInterval; - } - - - public String getUnitName() { - return unitName; - } - - - public void setUnitName(String unitName) { - this.unitName = unitName; - } - - - public boolean isUnitMode() { - return unitMode; - } - - - public void setUnitMode(boolean unitMode) { - this.unitMode = unitMode; - } - - - public boolean isVipChannelEnabled() { - return vipChannelEnabled; - } - - - public void setVipChannelEnabled(final boolean vipChannelEnabled) { - this.vipChannelEnabled = vipChannelEnabled; - } - - - @Override - public String toString() { - return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName - + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval - + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" - + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" - + vipChannelEnabled + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java b/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java deleted file mode 100644 index 4e202e9..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client; - -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.remoting.exception.RemotingException; - - -/** - * Base interface for MQ management - * - * @author shijia.wxr - */ -public interface MQAdmin { - /** - * Creates an topic - * - * @param key - * accesskey - * @param newTopic - * topic name - * @param queueNum - * topic's queue number - * - * @throws MQClientException - */ - void createTopic(final String key, final String newTopic, final int queueNum) - throws MQClientException; - - - /** - * Creates an topic - * - * @param key - * accesskey - * @param newTopic - * topic name - * @param queueNum - * topic's queue number - * @param topicSysFlag - * topic system flag - * - * @throws MQClientException - */ - void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) - throws MQClientException; - - - /** - * Gets the message queue offset according to some time in milliseconds<br> - * be cautious to call because of more IO overhead - * - * @param mq - * Instance of MessageQueue - * @param timestamp - * from when in milliseconds. - * - * @return offset - * - * @throws MQClientException - */ - long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException; - - - /** - * Gets the max offset - * - * @param mq - * Instance of MessageQueue - * - * @return the max offset - * - * @throws MQClientException - */ - long maxOffset(final MessageQueue mq) throws MQClientException; - - - /** - * Gets the minimum offset - * - * @param mq - * Instance of MessageQueue - * - * @return the minimum offset - * - * @throws MQClientException - */ - long minOffset(final MessageQueue mq) throws MQClientException; - - - /** - * Gets the earliest stored message time - * - * @param mq - * Instance of MessageQueue - * - * @return the time in microseconds - * - * @throws MQClientException - */ - long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException; - - - /** - * Query message according tto message id - * - * @param offsetMsgId - * message id - * - * @return message - * - * @throws InterruptedException - * @throws MQBrokerException - * @throws RemotingException - * @throws MQClientException - */ - MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; - - - /** - * Query messages - * - * @param topic - * message topic - * @param key - * message key index word - * @param maxNum - * max message number - * @param begin - * from when - * @param end - * to when - * - * @return Instance of QueryResult - * - * @throws MQClientException - * @throws InterruptedException - */ - QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, - final long end) throws MQClientException, InterruptedException; - - /** - - * @param topic - * @param msgId - * @return The {@code MessageExt} of given msgId - * @throws RemotingException - * @throws MQBrokerException - * @throws InterruptedException - * @throws MQClientException - */ - MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java b/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java deleted file mode 100644 index 5934b49..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import org.slf4j.Logger; - -import java.util.Set; -import java.util.TreeSet; - - -/** - * @author shijia.wxr - */ -public class MQHelper { - public static void resetOffsetByTimestamp( - final MessageModel messageModel, - final String consumerGroup, - final String topic, - final long timestamp) throws Exception { - resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, timestamp); - } - - /** - * Reset consumer topic offset according to time - * - * @param messageModel - * which model - * @param instanceName - * which instance - * @param consumerGroup - * consumer group - * @param topic - * topic - * @param timestamp - * time - * - * @throws Exception - */ - public static void resetOffsetByTimestamp( - final MessageModel messageModel, - final String instanceName, - final String consumerGroup, - final String topic, - final long timestamp) throws Exception { - final Logger log = ClientLogger.getLog(); - - DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup); - consumer.setInstanceName(instanceName); - consumer.setMessageModel(messageModel); - consumer.start(); - - Set<MessageQueue> mqs = null; - try { - mqs = consumer.fetchSubscribeMessageQueues(topic); - if (mqs != null && !mqs.isEmpty()) { - TreeSet<MessageQueue> mqsNew = new TreeSet<MessageQueue>(mqs); - for (MessageQueue mq : mqsNew) { - long offset = consumer.searchOffset(mq, timestamp); - if (offset >= 0) { - consumer.updateConsumeOffset(mq, offset); - log.info("resetOffsetByTimestamp updateConsumeOffset success, {} {} {}", - consumerGroup, offset, mq); - } - } - } - } catch (Exception e) { - log.warn("resetOffsetByTimestamp Exception", e); - throw e; - } finally { - if (mqs != null) { - consumer.getDefaultMQPullConsumerImpl().getOffsetStore().persistAll(mqs); - } - consumer.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java b/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java deleted file mode 100644 index 43c8106..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client; - -import com.alibaba.rocketmq.common.message.MessageExt; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class QueryResult { - private final long indexLastUpdateTimestamp; - private final List<MessageExt> messageList; - - - public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> messageList) { - this.indexLastUpdateTimestamp = indexLastUpdateTimestamp; - this.messageList = messageList; - } - - - public long getIndexLastUpdateTimestamp() { - return indexLastUpdateTimestamp; - } - - - public List<MessageExt> getMessageList() { - return messageList; - } - - - @Override - public String toString() { - return "QueryResult [indexLastUpdateTimestamp=" + indexLastUpdateTimestamp + ", messageList=" - + messageList + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/Validators.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/Validators.java b/client/src/main/java/com/alibaba/rocketmq/client/Validators.java deleted file mode 100644 index 203aae0..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/Validators.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.client; - -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.producer.DefaultMQProducer; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.protocol.ResponseCode; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - - -/** - * Common Validator - * - * @author manhong.yqd - */ -public class Validators { - public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; - public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); - public static final int CHARACTER_MAX_LENGTH = 255; - - /** - * @param origin - * @param patternStr - * - * @return The resulting {@code String} - */ - public static String getGroupWithRegularExpression(String origin, String patternStr) { - Pattern pattern = Pattern.compile(patternStr); - Matcher matcher = pattern.matcher(origin); - while (matcher.find()) { - return matcher.group(0); - } - return null; - } - - /** - * Validate group - * - * @param group - * - * @throws com.alibaba.rocketmq.client.exception.MQClientException - */ - public static void checkGroup(String group) throws MQClientException { - if (UtilAll.isBlank(group)) { - throw new MQClientException("the specified group is blank", null); - } - if (!regularExpressionMatcher(group, PATTERN)) { - throw new MQClientException(String.format( - "the specified group[%s] contains illegal characters, allowing only %s", group, - VALID_PATTERN_STR), null); - } - if (group.length() > CHARACTER_MAX_LENGTH) { - throw new MQClientException("the specified group is longer than group max length 255.", null); - } - } - - /** - * @param origin - * @param pattern - * - * @return <tt>true</tt> if, and only if, the entire origin sequence - * matches this matcher's pattern - */ - public static boolean regularExpressionMatcher(String origin, Pattern pattern) { - if (pattern == null) { - return true; - } - Matcher matcher = pattern.matcher(origin); - return matcher.matches(); - } - - /** - * Validate message - * - * @param msg - * @param defaultMQProducer - * - * @throws com.alibaba.rocketmq.client.exception.MQClientException - */ - public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) - throws MQClientException { - if (null == msg) { - throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); - } - // topic - Validators.checkTopic(msg.getTopic()); - // body - if (null == msg.getBody()) { - throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); - } - - if (0 == msg.getBody().length) { - throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); - } - - if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { - throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, - "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); - } - } - - /** - * Validate topic - * - * @param topic - * - * @throws com.alibaba.rocketmq.client.exception.MQClientException - */ - public static void checkTopic(String topic) throws MQClientException { - if (UtilAll.isBlank(topic)) { - throw new MQClientException("the specified topic is blank", null); - } - - if (!regularExpressionMatcher(topic, PATTERN)) { - throw new MQClientException(String.format( - "the specified topic[%s] contains illegal characters, allowing only %s", topic, - VALID_PATTERN_STR), null); - } - - if (topic.length() > CHARACTER_MAX_LENGTH) { - throw new MQClientException("the specified topic is longer than topic max length 255.", null); - } - - //whether the same with system reserved keyword - if (topic.equals(MixAll.DEFAULT_TOPIC)) { - throw new MQClientException( - String.format("the topic[%s] is conflict with default topic.", topic), null); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java b/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java deleted file mode 100644 index 071a872..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.admin; - -/** - * @author shijia.wxr - */ -public interface MQAdminExtInner { - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java deleted file mode 100644 index 88d0eea..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.client.common; - -public class ClientErrorCode { - public static final int CONNECT_BROKER_EXCEPTION = 10001; - public static final int ACCESS_BROKER_TIMEOUT = 10002; - public static final int BROKER_NOT_EXIST_EXCEPTION = 10003; - public static final int NO_NAME_SERVER_EXCEPTION = 10004; - public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java deleted file mode 100644 index 63fda5d..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.client.common; - -import java.util.Random; - -public class ThreadLocalIndex { - private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>(); - private final Random random = new Random(); - public ThreadLocalIndex(int value) { - - } - - public int getAndIncrement() { - Integer index = this.threadLocalIndex.get(); - if (null == index) { - index = Math.abs(random.nextInt()); - if (index < 0) index = 0; - this.threadLocalIndex.set(index); - } - - index = Math.abs(index + 1); - if (index < 0) - index = 0; - - this.threadLocalIndex.set(index); - return index; - } - - @Override - public String toString() { - return "ThreadLocalIndex{" + - "threadLocalIndex=" + threadLocalIndex.get() + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java deleted file mode 100644 index 4d70167..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.List; - - -/** - * Strategy Algorithm for message allocating between consumers - * - * @author shijia.wxr - * @author vongosling - */ -public interface AllocateMessageQueueStrategy { - - /** - * Allocating by consumer id - * - * @param consumerGroup - * current consumer group - * @param currentCID - * current consumer id - * @param mqAll - * message queue set in current topic - * @param cidAll - * consumer set in current consumer group - * - * @return The allocate result of given strategy - */ - List<MessageQueue> allocate( - final String consumerGroup, - final String currentCID, - final List<MessageQueue> mqAll, - final List<String> cidAll - ); - - - /** - * Algorithm name - * - * @return The strategy name - */ - String getName(); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java deleted file mode 100644 index 96040ae..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ /dev/null @@ -1,381 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -import com.alibaba.rocketmq.client.ClientConfig; -import com.alibaba.rocketmq.client.QueryResult; -import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; -import com.alibaba.rocketmq.client.consumer.store.OffsetStore; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.message.MessageDecoder; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.remoting.exception.RemotingException; - -import java.util.HashSet; -import java.util.Set; - - -/** - * Default pulling consumer - * - * @author shijia.wxr - */ -public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer { - protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl; - - /** - * Do the same thing for the same Group, the application must be set,and - * guarantee Globally unique - */ - private String consumerGroup; - /** - * Long polling mode, the Consumer connection max suspend time, it is not - * recommended to modify - */ - private long brokerSuspendMaxTimeMillis = 1000 * 20; - /** - * Long polling mode, the Consumer connection timeout(must greater than - * brokerSuspendMaxTimeMillis), it is not recommended to modify - */ - private long consumerTimeoutMillisWhenSuspend = 1000 * 30; - /** - * The socket timeout in milliseconds - */ - private long consumerPullTimeoutMillis = 1000 * 10; - /** - * Consumption pattern,default is clustering - */ - private MessageModel messageModel = MessageModel.CLUSTERING; - /** - * Message queue listener - */ - private MessageQueueListener messageQueueListener; - /** - * Offset Storage - */ - private OffsetStore offsetStore; - /** - * Topic set you want to register - */ - private Set<String> registerTopics = new HashSet<String>(); - /** - * Queue allocation algorithm - */ - private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); - /** - * Whether the unit of subscription group - */ - private boolean unitMode = false; - - private int maxReconsumeTimes = 16; - - - public DefaultMQPullConsumer() { - this(MixAll.DEFAULT_CONSUMER_GROUP, null); - } - - - public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) { - this.consumerGroup = consumerGroup; - defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook); - } - - - public DefaultMQPullConsumer(final String consumerGroup) { - this(consumerGroup, null); - } - - - public DefaultMQPullConsumer(RPCHook rpcHook) { - this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); - } - - @Override - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - createTopic(key, newTopic, queueNum, 0); - } - - - @Override - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); - } - - - @Override - public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp); - } - - - @Override - public long maxOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQPullConsumerImpl.maxOffset(mq); - } - - - @Override - public long minOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQPullConsumerImpl.minOffset(mq); - } - - - @Override - public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq); - } - - - @Override - public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException { - return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId); - } - - - @Override - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { - return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, begin, end); - } - - - public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { - return allocateMessageQueueStrategy; - } - - - public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) { - this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; - } - - - public long getBrokerSuspendMaxTimeMillis() { - return brokerSuspendMaxTimeMillis; - } - - - public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) { - this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis; - } - - - public String getConsumerGroup() { - return consumerGroup; - } - - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - - public long getConsumerPullTimeoutMillis() { - return consumerPullTimeoutMillis; - } - - - public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) { - this.consumerPullTimeoutMillis = consumerPullTimeoutMillis; - } - - - public long getConsumerTimeoutMillisWhenSuspend() { - return consumerTimeoutMillisWhenSuspend; - } - - - public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) { - this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend; - } - - - public MessageModel getMessageModel() { - return messageModel; - } - - - public void setMessageModel(MessageModel messageModel) { - this.messageModel = messageModel; - } - - - public MessageQueueListener getMessageQueueListener() { - return messageQueueListener; - } - - - public void setMessageQueueListener(MessageQueueListener messageQueueListener) { - this.messageQueueListener = messageQueueListener; - } - - - public Set<String> getRegisterTopics() { - return registerTopics; - } - - - public void setRegisterTopics(Set<String> registerTopics) { - this.registerTopics = registerTopics; - } - - - @Override - public void sendMessageBack(MessageExt msg, int delayLevel) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null); - } - - - @Override - public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName); - } - - @Override - public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { - return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(topic); - } - - @Override - public void start() throws MQClientException { - this.defaultMQPullConsumerImpl.start(); - } - - @Override - public void shutdown() { - this.defaultMQPullConsumerImpl.shutdown(); - } - - @Override - public void registerMessageQueueListener(String topic, MessageQueueListener listener) { - synchronized (this.registerTopics) { - this.registerTopics.add(topic); - if (listener != null) { - this.messageQueueListener = listener; - } - } - } - - @Override - public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums); - } - - @Override - public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout); - } - - @Override - public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) - throws MQClientException, RemotingException, InterruptedException { - this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback); - } - - @Override - public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout); - } - - @Override - public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums); - } - - @Override - public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) - throws MQClientException, RemotingException, InterruptedException { - this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback); - } - - @Override - public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException { - this.defaultMQPullConsumerImpl.updateConsumeOffset(mq, offset); - } - - @Override - public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException { - return this.defaultMQPullConsumerImpl.fetchConsumeOffset(mq, fromStore); - } - - @Override - public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException { - return this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(topic); - } - - @Override - public MessageExt viewMessage(String topic, String uniqKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - try { - MessageDecoder.decodeMessageId(uniqKey); - return this.viewMessage(uniqKey); - } catch (Exception e) { - } - return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(topic, uniqKey); - } - - @Override - public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup); - } - - public OffsetStore getOffsetStore() { - return offsetStore; - } - - - public void setOffsetStore(OffsetStore offsetStore) { - this.offsetStore = offsetStore; - } - - - public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() { - return defaultMQPullConsumerImpl; - } - - - public boolean isUnitMode() { - return unitMode; - } - - - public void setUnitMode(boolean isUnitMode) { - this.unitMode = isUnitMode; - } - - - public int getMaxReconsumeTimes() { - return maxReconsumeTimes; - } - - - public void setMaxReconsumeTimes(final int maxReconsumeTimes) { - this.maxReconsumeTimes = maxReconsumeTimes; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java deleted file mode 100644 index f37e982..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ /dev/null @@ -1,519 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -import com.alibaba.rocketmq.client.ClientConfig; -import com.alibaba.rocketmq.client.QueryResult; -import com.alibaba.rocketmq.client.consumer.listener.MessageListener; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; -import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; -import com.alibaba.rocketmq.client.consumer.store.OffsetStore; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.message.MessageDecoder; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.remoting.exception.RemotingException; - -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - - -/** - * Wrapped push consumer.in fact,it works as remarkable as the pull consumer - * - * @author shijia.wxr - */ -public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { - protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; - /** - * Do the same thing for the same Group, the application must be set,and - * guarantee Globally unique - */ - private String consumerGroup; - /** - * Consumption pattern,default is clustering - */ - private MessageModel messageModel = MessageModel.CLUSTERING; - /** - * Consumption offset - */ - private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; - /** - * Backtracking consumption time with second precision.time format is - * 20131223171201<br> - * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br> - * Default backtracking consumption time Half an hour ago - */ - private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)); - /** - * Queue allocation algorithm - */ - private AllocateMessageQueueStrategy allocateMessageQueueStrategy; - - /** - * Subscription relationship - */ - private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>(); - /** - * Message listener - */ - private MessageListener messageListener; - /** - * Offset Storage - */ - private OffsetStore offsetStore; - /** - * Minimum consumer thread number - */ - private int consumeThreadMin = 20; - /** - * Max consumer thread number - */ - private int consumeThreadMax = 64; - - /** - * Threshold for dynamic adjustment of the number of thread pool - */ - private long adjustThreadPoolNumsThreshold = 100000; - - /** - * Concurrently max span offset.it has no effect on sequential consumption - */ - private int consumeConcurrentlyMaxSpan = 2000; - /** - * Flow control threshold - */ - private int pullThresholdForQueue = 1000; - /** - * Message pull Interval - */ - private long pullInterval = 0; - /** - * Batch consumption size - */ - private int consumeMessageBatchMaxSize = 1; - /** - * Batch pull size - */ - private int pullBatchSize = 32; - - /** - * Whether update subscription relationship when every pull - */ - private boolean postSubscriptionWhenPull = false; - - /** - * Whether the unit of subscription group - */ - private boolean unitMode = false; - - private int maxReconsumeTimes = -1; - private long suspendCurrentQueueTimeMillis = 1000; - private long consumeTimeout = 15; - - - public DefaultMQPushConsumer() { - this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); - } - - - public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { - this.consumerGroup = consumerGroup; - this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; - defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); - } - - - public DefaultMQPushConsumer(RPCHook rpcHook) { - this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); - } - - - public DefaultMQPushConsumer(final String consumerGroup) { - this(consumerGroup, null, new AllocateMessageQueueAveragely()); - } - - @Override - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - createTopic(key, newTopic, queueNum, 0); - } - - - @Override - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); - } - - - @Override - public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp); - } - - - @Override - public long maxOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQPushConsumerImpl.maxOffset(mq); - } - - - @Override - public long minOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQPushConsumerImpl.minOffset(mq); - } - - - @Override - public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq); - } - - - @Override - public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId); - } - - - @Override - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { - return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end); - } - - @Override - public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - try { - MessageDecoder.decodeMessageId(msgId); - return this.viewMessage(msgId); - } catch (Exception e) { - } - return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, msgId); - } - - public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { - return allocateMessageQueueStrategy; - } - - - public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) { - this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; - } - - - public int getConsumeConcurrentlyMaxSpan() { - return consumeConcurrentlyMaxSpan; - } - - - public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) { - this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan; - } - - - public ConsumeFromWhere getConsumeFromWhere() { - return consumeFromWhere; - } - - - public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { - this.consumeFromWhere = consumeFromWhere; - } - - - public int getConsumeMessageBatchMaxSize() { - return consumeMessageBatchMaxSize; - } - - - public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) { - this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; - } - - - public String getConsumerGroup() { - return consumerGroup; - } - - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - - public int getConsumeThreadMax() { - return consumeThreadMax; - } - - - public void setConsumeThreadMax(int consumeThreadMax) { - this.consumeThreadMax = consumeThreadMax; - } - - - public int getConsumeThreadMin() { - return consumeThreadMin; - } - - - public void setConsumeThreadMin(int consumeThreadMin) { - this.consumeThreadMin = consumeThreadMin; - } - - - public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() { - return defaultMQPushConsumerImpl; - } - - - public MessageListener getMessageListener() { - return messageListener; - } - - - public void setMessageListener(MessageListener messageListener) { - this.messageListener = messageListener; - } - - - public MessageModel getMessageModel() { - return messageModel; - } - - - public void setMessageModel(MessageModel messageModel) { - this.messageModel = messageModel; - } - - - public int getPullBatchSize() { - return pullBatchSize; - } - - - public void setPullBatchSize(int pullBatchSize) { - this.pullBatchSize = pullBatchSize; - } - - - public long getPullInterval() { - return pullInterval; - } - - - public void setPullInterval(long pullInterval) { - this.pullInterval = pullInterval; - } - - - public int getPullThresholdForQueue() { - return pullThresholdForQueue; - } - - - public void setPullThresholdForQueue(int pullThresholdForQueue) { - this.pullThresholdForQueue = pullThresholdForQueue; - } - - - public Map<String, String> getSubscription() { - return subscription; - } - - - public void setSubscription(Map<String, String> subscription) { - this.subscription = subscription; - } - - - @Override - public void sendMessageBack(MessageExt msg, int delayLevel) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null); - } - - - @Override - public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName); - } - - - @Override - public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { - return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic); - } - - - @Override - public void start() throws MQClientException { - this.defaultMQPushConsumerImpl.start(); - } - - - @Override - public void shutdown() { - this.defaultMQPushConsumerImpl.shutdown(); - } - - - @Override - @Deprecated - public void registerMessageListener(MessageListener messageListener) { - this.messageListener = messageListener; - this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); - } - - - @Override - public void registerMessageListener(MessageListenerConcurrently messageListener) { - this.messageListener = messageListener; - this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); - } - - - @Override - public void registerMessageListener(MessageListenerOrderly messageListener) { - this.messageListener = messageListener; - this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); - } - - - @Override - public void subscribe(String topic, String subExpression) throws MQClientException { - this.defaultMQPushConsumerImpl.subscribe(topic, subExpression); - } - - - @Override - public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { - this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource); - } - - - @Override - public void unsubscribe(String topic) { - this.defaultMQPushConsumerImpl.unsubscribe(topic); - } - - - @Override - public void updateCorePoolSize(int corePoolSize) { - this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize); - } - - - @Override - public void suspend() { - this.defaultMQPushConsumerImpl.suspend(); - } - - - @Override - public void resume() { - this.defaultMQPushConsumerImpl.resume(); - } - - - public OffsetStore getOffsetStore() { - return offsetStore; - } - - - public void setOffsetStore(OffsetStore offsetStore) { - this.offsetStore = offsetStore; - } - - - public String getConsumeTimestamp() { - return consumeTimestamp; - } - - - public void setConsumeTimestamp(String consumeTimestamp) { - this.consumeTimestamp = consumeTimestamp; - } - - - public boolean isPostSubscriptionWhenPull() { - return postSubscriptionWhenPull; - } - - - public void setPostSubscriptionWhenPull(boolean postSubscriptionWhenPull) { - this.postSubscriptionWhenPull = postSubscriptionWhenPull; - } - - - public boolean isUnitMode() { - return unitMode; - } - - - public void setUnitMode(boolean isUnitMode) { - this.unitMode = isUnitMode; - } - - - public long getAdjustThreadPoolNumsThreshold() { - return adjustThreadPoolNumsThreshold; - } - - - public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) { - this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold; - } - - - public int getMaxReconsumeTimes() { - return maxReconsumeTimes; - } - - - public void setMaxReconsumeTimes(final int maxReconsumeTimes) { - this.maxReconsumeTimes = maxReconsumeTimes; - } - - - public long getSuspendCurrentQueueTimeMillis() { - return suspendCurrentQueueTimeMillis; - } - - - public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) { - this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; - } - - - public long getConsumeTimeout() { - return consumeTimeout; - } - - public void setConsumeTimeout(final long consumeTimeout) { - this.consumeTimeout = consumeTimeout; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java deleted file mode 100644 index 2a46b65..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -import com.alibaba.rocketmq.client.MQAdmin; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.remoting.exception.RemotingException; - -import java.util.Set; - - -/** - * Message queue consumer interface - * - * @author shijia.wxr - */ -public interface MQConsumer extends MQAdmin { - /** - * If consuming failure,message will be send back to the brokers,and delay consuming some time - * - * @param msg - * @param delayLevel - * - * @throws InterruptedException - * @throws MQBrokerException - * @throws RemotingException - * @throws MQClientException - */ - @Deprecated - void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException; - - - /** - * If consuming failure,message will be send back to the broker,and delay consuming some time - * - * @param msg - * @param delayLevel - * @param brokerName - * - * @throws RemotingException - * @throws MQBrokerException - * @throws InterruptedException - * @throws MQClientException - */ - void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - - - /** - * Fetch message queues from consumer cache according to the topic - * - * @param topic - * message topic - * - * @return queue set - * - * @throws MQClientException - */ - Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException; -}