http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
 
b/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
deleted file mode 100644
index 1c93b02..0000000
--- 
a/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-/**
- * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
- */
-package com.alibaba.rocketmq.broker.topic;
-
-import com.alibaba.rocketmq.broker.BrokerTestHarness;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.TopicConfig;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-
-/**
- * @author zander
- */
-public class TopicConfigManagerTest extends BrokerTestHarness {
-    @Test
-    public void testFlushTopicConfig() throws Exception {
-        TopicConfigManager topicConfigManager = new 
TopicConfigManager(brokerController);
-
-        for (int i = 0; i < 10; i++) {
-            String topic = "UNITTEST-" + i;
-            TopicConfig topicConfig = 
topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, 
null, 4, 0);
-            assertNotNull(topicConfig);
-        }
-        topicConfigManager.persist();
-
-        topicConfigManager.getTopicConfigTable().clear();
-
-        for (int i = 0; i < 10; i++) {
-            String topic = "UNITTEST-" + i;
-            TopicConfig topicConfig = 
topicConfigManager.selectTopicConfig(topic);
-            assertNull(topicConfig);
-        }
-        topicConfigManager.load();
-        for (int i = 0; i < 10; i++) {
-            String topic = "UNITTEST-" + i;
-            TopicConfig topicConfig = 
topicConfigManager.selectTopicConfig(topic);
-            assertNotNull(topicConfig);
-            assertEquals(topicConfig.getTopicSysFlag(), 0);
-            assertEquals(topicConfig.getReadQueueNums(), 4);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
new file mode 100644
index 0000000..79f82a6
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author shtykh_roman
+ */
+public class BrokerControllerTest {
+    protected Logger logger = 
LoggerFactory.getLogger(BrokerControllerTest.class);
+
+    private static final int RESTART_NUM = 3;
+
+    /**
+     * Tests if the controller can be properly stopped and started.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testRestart() throws Exception {
+
+        for (int i = 0; i < RESTART_NUM; i++) {
+            BrokerController brokerController = new BrokerController(//
+                new BrokerConfig(), //
+                new NettyServerConfig(), //
+                new NettyClientConfig(), //
+                new MessageStoreConfig());
+            boolean initResult = brokerController.initialize();
+            Assert.assertTrue(initResult);
+            logger.info("Broker is initialized " + initResult);
+            brokerController.start();
+            logger.info("Broker is started");
+
+            brokerController.shutdown();
+            logger.info("Broker is stopped");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java 
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
new file mode 100644
index 0000000..4b4fd95
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package org.apache.rocketmq.broker;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Random;
+
+/**
+ * @author zander
+ */
+public class BrokerTestHarness {
+
+    protected BrokerController brokerController = null;
+
+    protected Random random = new Random();
+    public final String BROKER_NAME = "TestBrokerName";
+    protected String brokerAddr = "";
+    protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class);
+    protected BrokerConfig brokerConfig = new BrokerConfig();
+    protected NettyServerConfig nettyServerConfig = new NettyServerConfig();
+    protected NettyClientConfig nettyClientConfig = new NettyClientConfig();
+    protected MessageStoreConfig storeConfig = new MessageStoreConfig();
+
+    @Before
+    public void startup() throws Exception {
+        brokerConfig.setBrokerName(BROKER_NAME);
+        brokerConfig.setBrokerIP1("127.0.0.1");
+        storeConfig.setStorePathRootDir(System.getProperty("user.home") + 
File.separator + "unitteststore");
+        storeConfig.setStorePathCommitLog(System.getProperty("user.home") + 
File.separator + "unitteststore" + File.separator + "commitlog");
+        nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
+        brokerAddr = brokerConfig.getBrokerIP1() + ":" + 
nettyServerConfig.getListenPort();
+        brokerController = new BrokerController(brokerConfig, 
nettyServerConfig, nettyClientConfig, storeConfig);
+        boolean initResult = brokerController.initialize();
+        Assert.assertTrue(initResult);
+        logger.info("Broker Start name:{} addr:{}", 
brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+        brokerController.start();
+    }
+
+    @After
+    public void shutdown() throws Exception {
+        if (brokerController != null) {
+            brokerController.shutdown();
+        }
+        //maybe need to clean the file store. But we do not suggest deleting 
anything.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
new file mode 100644
index 0000000..9988a7c
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package org.apache.rocketmq.broker.api;
+
+import org.apache.rocketmq.broker.BrokerTestHarness;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * @author zander
+ */
+public class SendMessageTest extends BrokerTestHarness{
+
+    MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), 
null, null, new ClientConfig());
+    String topic = "UnitTestTopic";
+
+    @Before
+    @Override
+    public void startup() throws Exception {
+        super.startup();
+        client.start();
+
+    }
+
+    @After
+    @Override
+    public void shutdown() throws Exception {
+        client.shutdown();
+        super.shutdown();
+    }
+
+    @Test
+    public void testSendSingle() throws Exception{
+        Message msg = new Message(topic, "TAG1 TAG2", "100200300", 
"body".getBytes());
+        SendMessageRequestHeader requestHeader = new 
SendMessageRequestHeader();
+        requestHeader.setProducerGroup("abc");
+        requestHeader.setTopic(msg.getTopic());
+        requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
+        requestHeader.setDefaultTopicQueueNums(4);
+        requestHeader.setQueueId(0);
+        requestHeader.setSysFlag(0);
+        requestHeader.setBornTimestamp(System.currentTimeMillis());
+        requestHeader.setFlag(msg.getFlag());
+        
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+        SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, 
requestHeader, 1000 * 5,
+                CommunicationMode.SYNC, new SendMessageContext(), null);
+        assertEquals(result.getSendStatus(), SendStatus.SEND_OK);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
new file mode 100644
index 0000000..cdbddf9
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package org.apache.rocketmq.broker.offset;
+
+import org.apache.rocketmq.broker.BrokerTestHarness;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * @author zander
+ */
+public class ConsumerOffsetManagerTest extends BrokerTestHarness {
+
+    @Test
+    public void testFlushConsumerOffset() throws Exception {
+        ConsumerOffsetManager consumerOffsetManager = new 
ConsumerOffsetManager(brokerController);
+        for (int i = 0; i < 10; i++) {
+            String group = "UNIT_TEST_GROUP_" + i;
+            for (int id = 0; id < 10; id++) {
+                consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id, 
id + 100);
+                consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id, 
id + 100);
+                consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id, 
id + 100);
+            }
+        }
+        consumerOffsetManager.persist();
+        consumerOffsetManager.getOffsetTable().clear();
+        for (int i = 0; i < 10; i++) {
+            String group = "UNIT_TEST_GROUP_" + i;
+            for (int id = 0; id < 10; id++) {
+                assertEquals(consumerOffsetManager.queryOffset(group, 
"TOPIC_A", id), -1);
+                assertEquals(consumerOffsetManager.queryOffset(group, 
"TOPIC_B", id), -1);
+                assertEquals(consumerOffsetManager.queryOffset(group, 
"TOPIC_B", id), -1);
+            }
+        }
+        consumerOffsetManager.load();
+        for (int i = 0; i < 10; i++) {
+            String group = "UNIT_TEST_GROUP_" + i;
+            for (int id = 0; id < 10; id++) {
+                assertEquals(consumerOffsetManager.queryOffset(group, 
"TOPIC_A", id), id + 100);
+                assertEquals(consumerOffsetManager.queryOffset(group, 
"TOPIC_B", id), id + 100);
+                assertEquals(consumerOffsetManager.queryOffset(group, 
"TOPIC_B", id), id + 100);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
new file mode 100644
index 0000000..1de17e6
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package org.apache.rocketmq.broker.topic;
+
+import org.apache.rocketmq.broker.BrokerTestHarness;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * @author zander
+ */
+public class TopicConfigManagerTest extends BrokerTestHarness {
+    @Test
+    public void testFlushTopicConfig() throws Exception {
+        TopicConfigManager topicConfigManager = new 
TopicConfigManager(brokerController);
+
+        for (int i = 0; i < 10; i++) {
+            String topic = "UNITTEST-" + i;
+            TopicConfig topicConfig = 
topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, 
null, 4, 0);
+            assertNotNull(topicConfig);
+        }
+        topicConfigManager.persist();
+
+        topicConfigManager.getTopicConfigTable().clear();
+
+        for (int i = 0; i < 10; i++) {
+            String topic = "UNITTEST-" + i;
+            TopicConfig topicConfig = 
topicConfigManager.selectTopicConfig(topic);
+            assertNull(topicConfig);
+        }
+        topicConfigManager.load();
+        for (int i = 0; i < 10; i++) {
+            String topic = "UNITTEST-" + i;
+            TopicConfig topicConfig = 
topicConfigManager.selectTopicConfig(topic);
+            assertNotNull(topicConfig);
+            assertEquals(topicConfig.getTopicSysFlag(), 0);
+            assertEquals(topicConfig.getReadQueueNums(), 4);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 63a6114..86d38cf 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -18,7 +18,7 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <groupId>com.alibaba.rocketmq</groupId>
+        <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
         <version>4.0.0-SNAPSHOT</version>
     </parent>
@@ -56,19 +56,19 @@
                                 <includes>
                                     <include>com.alibaba:fastjson</include>
                                     <include>io.netty:netty-all</include>
-                                    
<include>com.alibaba.rocketmq:rocketmq-client</include>
-                                    
<include>com.alibaba.rocketmq:rocketmq-common</include>
-                                    
<include>com.alibaba.rocketmq:rocketmq-remoting</include>
+                                    
<include>org.apache.rocketmq:rocketmq-client</include>
+                                    
<include>org.apache.rocketmq:rocketmq-common</include>
+                                    
<include>org.apache.rocketmq:rocketmq-remoting</include>
                                 </includes>
                             </artifactSet>
                             <relocations>
                                 <relocation>
                                     <pattern>io.netty</pattern>
-                                    
<shadedPattern>com.alibaba.rocketmq.shade.io.netty</shadedPattern>
+                                    
<shadedPattern>org.apache.rocketmq.shade.io.netty</shadedPattern>
                                 </relocation>
                                 <relocation>
                                     <pattern>com.alibaba.fastjson</pattern>
-                                    
<shadedPattern>com.alibaba.rocketmq.shade.com.alibaba.fastjson</shadedPattern>
+                                    
<shadedPattern>org.apache.rocketmq.shade.com.alibaba.fastjson</shadedPattern>
                                 </relocation>
                             </relocations>
                         </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java 
b/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
deleted file mode 100644
index 4d80564..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-
-
-/**
- * Client Common configuration
- *
- * @author shijia.wxr
- * @author vongosling
- */
-public class ClientConfig {
-    public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = 
"com.rocketmq.sendMessageWithVIPChannel";
-    private String namesrvAddr = 
System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, 
System.getenv(MixAll.NAMESRV_ADDR_ENV));
-    private String clientIP = RemotingUtil.getLocalAddress();
-    private String instanceName = System.getProperty("rocketmq.client.name", 
"DEFAULT");
-    private int clientCallbackExecutorThreads = 
Runtime.getRuntime().availableProcessors();
-    /**
-     * Pulling topic information interval from the named server
-     */
-    private int pollNameServerInteval = 1000 * 30;
-    /**
-     * Heartbeat interval in microseconds with message broker
-     */
-    private int heartbeatBrokerInterval = 1000 * 30;
-    /**
-     * Offset persistent interval for consumer
-     */
-    private int persistConsumerOffsetInterval = 1000 * 5;
-    private boolean unitMode = false;
-    private String unitName;
-    private boolean vipChannelEnabled = 
Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, 
"true"));
-
-    public String buildMQClientId() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(this.getClientIP());
-
-        sb.append("@");
-        sb.append(this.getInstanceName());
-        if (!UtilAll.isBlank(this.unitName)) {
-            sb.append("@");
-            sb.append(this.unitName);
-        }
-
-        return sb.toString();
-    }
-
-    public String getClientIP() {
-        return clientIP;
-    }
-
-    public void setClientIP(String clientIP) {
-        this.clientIP = clientIP;
-    }
-
-    public String getInstanceName() {
-        return instanceName;
-    }
-
-    public void setInstanceName(String instanceName) {
-        this.instanceName = instanceName;
-    }
-
-    public void changeInstanceNameToPID() {
-        if (this.instanceName.equals("DEFAULT")) {
-            this.instanceName = String.valueOf(UtilAll.getPid());
-        }
-    }
-
-    public void resetClientConfig(final ClientConfig cc) {
-        this.namesrvAddr = cc.namesrvAddr;
-        this.clientIP = cc.clientIP;
-        this.instanceName = cc.instanceName;
-        this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads;
-        this.pollNameServerInteval = cc.pollNameServerInteval;
-        this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
-        this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
-        this.unitMode = cc.unitMode;
-        this.unitName = cc.unitName;
-        this.vipChannelEnabled = cc.vipChannelEnabled;
-    }
-
-    public ClientConfig cloneClientConfig() {
-        ClientConfig cc = new ClientConfig();
-        cc.namesrvAddr = namesrvAddr;
-        cc.clientIP = clientIP;
-        cc.instanceName = instanceName;
-        cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
-        cc.pollNameServerInteval = pollNameServerInteval;
-        cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
-        cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
-        cc.unitMode = unitMode;
-        cc.unitName = unitName;
-        cc.vipChannelEnabled = vipChannelEnabled;
-        return cc;
-    }
-
-    public String getNamesrvAddr() {
-        return namesrvAddr;
-    }
-
-    public void setNamesrvAddr(String namesrvAddr) {
-        this.namesrvAddr = namesrvAddr;
-    }
-
-    public int getClientCallbackExecutorThreads() {
-        return clientCallbackExecutorThreads;
-    }
-
-
-    public void setClientCallbackExecutorThreads(int 
clientCallbackExecutorThreads) {
-        this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
-    }
-
-
-    public int getPollNameServerInteval() {
-        return pollNameServerInteval;
-    }
-
-
-    public void setPollNameServerInteval(int pollNameServerInteval) {
-        this.pollNameServerInteval = pollNameServerInteval;
-    }
-
-
-    public int getHeartbeatBrokerInterval() {
-        return heartbeatBrokerInterval;
-    }
-
-
-    public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
-        this.heartbeatBrokerInterval = heartbeatBrokerInterval;
-    }
-
-
-    public int getPersistConsumerOffsetInterval() {
-        return persistConsumerOffsetInterval;
-    }
-
-
-    public void setPersistConsumerOffsetInterval(int 
persistConsumerOffsetInterval) {
-        this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
-    }
-
-
-    public String getUnitName() {
-        return unitName;
-    }
-
-
-    public void setUnitName(String unitName) {
-        this.unitName = unitName;
-    }
-
-
-    public boolean isUnitMode() {
-        return unitMode;
-    }
-
-
-    public void setUnitMode(boolean unitMode) {
-        this.unitMode = unitMode;
-    }
-
-
-    public boolean isVipChannelEnabled() {
-        return vipChannelEnabled;
-    }
-
-
-    public void setVipChannelEnabled(final boolean vipChannelEnabled) {
-        this.vipChannelEnabled = vipChannelEnabled;
-    }
-
-
-    @Override
-    public String toString() {
-        return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + 
clientIP + ", instanceName=" + instanceName
-                + ", clientCallbackExecutorThreads=" + 
clientCallbackExecutorThreads + ", pollNameServerInteval=" + 
pollNameServerInteval
-                + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", 
persistConsumerOffsetInterval="
-                + persistConsumerOffsetInterval + ", unitMode=" + unitMode + 
", unitName=" + unitName + ", vipChannelEnabled="
-                + vipChannelEnabled + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java 
b/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
deleted file mode 100644
index 4e202e9..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-
-/**
- * Base interface for MQ management
- *
- * @author shijia.wxr
- */
-public interface MQAdmin {
-    /**
-     * Creates an topic
-     *
-     * @param key
-     *         accesskey
-     * @param newTopic
-     *         topic name
-     * @param queueNum
-     *         topic's queue number
-     *
-     * @throws MQClientException
-     */
-    void createTopic(final String key, final String newTopic, final int 
queueNum)
-            throws MQClientException;
-
-
-    /**
-     * Creates an topic
-     *
-     * @param key
-     *         accesskey
-     * @param newTopic
-     *         topic name
-     * @param queueNum
-     *         topic's queue number
-     * @param topicSysFlag
-     *         topic system flag
-     *
-     * @throws MQClientException
-     */
-    void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag)
-            throws MQClientException;
-
-
-    /**
-     * Gets the message queue offset according to some time in milliseconds<br>
-     * be cautious to call because of more IO overhead
-     *
-     * @param mq
-     *         Instance of MessageQueue
-     * @param timestamp
-     *         from when in milliseconds.
-     *
-     * @return offset
-     *
-     * @throws MQClientException
-     */
-    long searchOffset(final MessageQueue mq, final long timestamp) throws 
MQClientException;
-
-
-    /**
-     * Gets the max offset
-     *
-     * @param mq
-     *         Instance of MessageQueue
-     *
-     * @return the max offset
-     *
-     * @throws MQClientException
-     */
-    long maxOffset(final MessageQueue mq) throws MQClientException;
-
-
-    /**
-     * Gets the minimum offset
-     *
-     * @param mq
-     *         Instance of MessageQueue
-     *
-     * @return the minimum offset
-     *
-     * @throws MQClientException
-     */
-    long minOffset(final MessageQueue mq) throws MQClientException;
-
-
-    /**
-     * Gets the earliest stored message time
-     *
-     * @param mq
-     *         Instance of MessageQueue
-     *
-     * @return the time in microseconds
-     *
-     * @throws MQClientException
-     */
-    long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
-
-
-    /**
-     * Query message according tto message id
-     *
-     * @param offsetMsgId
-     *         message id
-     *
-     * @return message
-     *
-     * @throws InterruptedException
-     * @throws MQBrokerException
-     * @throws RemotingException
-     * @throws MQClientException
-     */
-    MessageExt viewMessage(final String offsetMsgId) throws RemotingException, 
MQBrokerException,
-            InterruptedException, MQClientException;
-
-
-    /**
-     * Query messages
-     *
-     * @param topic
-     *         message topic
-     * @param key
-     *         message key index word
-     * @param maxNum
-     *         max message number
-     * @param begin
-     *         from when
-     * @param end
-     *         to when
-     *
-     * @return Instance of QueryResult
-     *
-     * @throws MQClientException
-     * @throws InterruptedException
-     */
-    QueryResult queryMessage(final String topic, final String key, final int 
maxNum, final long begin,
-                             final long end) throws MQClientException, 
InterruptedException;
-    
-    /**
-
-     * @param topic
-     * @param msgId
-     * @return The {@code MessageExt} of given msgId
-     * @throws RemotingException
-     * @throws MQBrokerException
-     * @throws InterruptedException
-     * @throws MQClientException
-     */
-    MessageExt viewMessage(String topic, String msgId) throws 
RemotingException, MQBrokerException, InterruptedException, MQClientException;  
      
-
-    
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java 
b/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
deleted file mode 100644
index 5934b49..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.slf4j.Logger;
-
-import java.util.Set;
-import java.util.TreeSet;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQHelper {
-    public static void resetOffsetByTimestamp(
-            final MessageModel messageModel,
-            final String consumerGroup,
-            final String topic,
-            final long timestamp) throws Exception {
-        resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, 
timestamp);
-    }
-
-    /**
-     * Reset consumer topic offset according to time
-     *
-     * @param messageModel
-     *         which model
-     * @param instanceName
-     *         which instance
-     * @param consumerGroup
-     *         consumer group
-     * @param topic
-     *         topic
-     * @param timestamp
-     *         time
-     *
-     * @throws Exception
-     */
-    public static void resetOffsetByTimestamp(
-            final MessageModel messageModel,
-            final String instanceName,
-            final String consumerGroup,
-            final String topic,
-            final long timestamp) throws Exception {
-        final Logger log = ClientLogger.getLog();
-
-        DefaultMQPullConsumer consumer = new 
DefaultMQPullConsumer(consumerGroup);
-        consumer.setInstanceName(instanceName);
-        consumer.setMessageModel(messageModel);
-        consumer.start();
-
-        Set<MessageQueue> mqs = null;
-        try {
-            mqs = consumer.fetchSubscribeMessageQueues(topic);
-            if (mqs != null && !mqs.isEmpty()) {
-                TreeSet<MessageQueue> mqsNew = new TreeSet<MessageQueue>(mqs);
-                for (MessageQueue mq : mqsNew) {
-                    long offset = consumer.searchOffset(mq, timestamp);
-                    if (offset >= 0) {
-                        consumer.updateConsumeOffset(mq, offset);
-                        log.info("resetOffsetByTimestamp updateConsumeOffset 
success, {} {} {}",
-                                consumerGroup, offset, mq);
-                    }
-                }
-            }
-        } catch (Exception e) {
-            log.warn("resetOffsetByTimestamp Exception", e);
-            throw e;
-        } finally {
-            if (mqs != null) {
-                
consumer.getDefaultMQPullConsumerImpl().getOffsetStore().persistAll(mqs);
-            }
-            consumer.shutdown();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java 
b/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
deleted file mode 100644
index 43c8106..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class QueryResult {
-    private final long indexLastUpdateTimestamp;
-    private final List<MessageExt> messageList;
-
-
-    public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> 
messageList) {
-        this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
-        this.messageList = messageList;
-    }
-
-
-    public long getIndexLastUpdateTimestamp() {
-        return indexLastUpdateTimestamp;
-    }
-
-
-    public List<MessageExt> getMessageList() {
-        return messageList;
-    }
-
-
-    @Override
-    public String toString() {
-        return "QueryResult [indexLastUpdateTimestamp=" + 
indexLastUpdateTimestamp + ", messageList="
-                + messageList + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/Validators.java 
b/client/src/main/java/com/alibaba/rocketmq/client/Validators.java
deleted file mode 100644
index 203aae0..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/Validators.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-
-/**
- * Common Validator
- *
- * @author manhong.yqd
- */
-public class Validators {
-    public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
-    public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
-    public static final int CHARACTER_MAX_LENGTH = 255;
-
-    /**
-     * @param origin
-     * @param patternStr
-     *
-     * @return The resulting {@code String}
-     */
-    public static String getGroupWithRegularExpression(String origin, String 
patternStr) {
-        Pattern pattern = Pattern.compile(patternStr);
-        Matcher matcher = pattern.matcher(origin);
-        while (matcher.find()) {
-            return matcher.group(0);
-        }
-        return null;
-    }
-
-    /**
-     * Validate group
-     *
-     * @param group
-     *
-     * @throws com.alibaba.rocketmq.client.exception.MQClientException
-     */
-    public static void checkGroup(String group) throws MQClientException {
-        if (UtilAll.isBlank(group)) {
-            throw new MQClientException("the specified group is blank", null);
-        }
-        if (!regularExpressionMatcher(group, PATTERN)) {
-            throw new MQClientException(String.format(
-                    "the specified group[%s] contains illegal characters, 
allowing only %s", group,
-                    VALID_PATTERN_STR), null);
-        }
-        if (group.length() > CHARACTER_MAX_LENGTH) {
-            throw new MQClientException("the specified group is longer than 
group max length 255.", null);
-        }
-    }
-
-    /**
-     * @param origin
-     * @param pattern
-     *
-     * @return <tt>true</tt> if, and only if, the entire origin sequence
-     *          matches this matcher's pattern
-     */
-    public static boolean regularExpressionMatcher(String origin, Pattern 
pattern) {
-        if (pattern == null) {
-            return true;
-        }
-        Matcher matcher = pattern.matcher(origin);
-        return matcher.matches();
-    }
-
-    /**
-     * Validate message
-     *
-     * @param msg
-     * @param defaultMQProducer
-     *
-     * @throws com.alibaba.rocketmq.client.exception.MQClientException
-     */
-    public static void checkMessage(Message msg, DefaultMQProducer 
defaultMQProducer)
-            throws MQClientException {
-        if (null == msg) {
-            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the 
message is null");
-        }
-        // topic
-        Validators.checkTopic(msg.getTopic());
-        // body
-        if (null == msg.getBody()) {
-            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the 
message body is null");
-        }
-
-        if (0 == msg.getBody().length) {
-            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the 
message body length is zero");
-        }
-
-        if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
-            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
-                    "the message body size over max value, MAX: " + 
defaultMQProducer.getMaxMessageSize());
-        }
-    }
-
-    /**
-     * Validate topic
-     *
-     * @param topic
-     *
-     * @throws com.alibaba.rocketmq.client.exception.MQClientException
-     */
-    public static void checkTopic(String topic) throws MQClientException {
-        if (UtilAll.isBlank(topic)) {
-            throw new MQClientException("the specified topic is blank", null);
-        }
-
-        if (!regularExpressionMatcher(topic, PATTERN)) {
-            throw new MQClientException(String.format(
-                    "the specified topic[%s] contains illegal characters, 
allowing only %s", topic,
-                    VALID_PATTERN_STR), null);
-        }
-
-        if (topic.length() > CHARACTER_MAX_LENGTH) {
-            throw new MQClientException("the specified topic is longer than 
topic max length 255.", null);
-        }
-
-        //whether the same with system reserved keyword
-        if (topic.equals(MixAll.DEFAULT_TOPIC)) {
-            throw new MQClientException(
-                    String.format("the topic[%s] is conflict with default 
topic.", topic), null);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java 
b/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
deleted file mode 100644
index 071a872..0000000
--- 
a/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.admin;
-
-/**
- * @author shijia.wxr
- */
-public interface MQAdminExtInner {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java 
b/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
deleted file mode 100644
index 88d0eea..0000000
--- 
a/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.common;
-
-public class ClientErrorCode {
-    public static final int CONNECT_BROKER_EXCEPTION = 10001;
-    public static final int ACCESS_BROKER_TIMEOUT = 10002;
-    public static final int BROKER_NOT_EXIST_EXCEPTION = 10003;
-    public static final int NO_NAME_SERVER_EXCEPTION = 10004;
-    public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java 
b/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java
deleted file mode 100644
index 63fda5d..0000000
--- 
a/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.common;
-
-import java.util.Random;
-
-public class ThreadLocalIndex {
-    private final ThreadLocal<Integer> threadLocalIndex = new 
ThreadLocal<Integer>();
-    private final Random random = new Random();
-    public ThreadLocalIndex(int value) {
-
-    }
-
-    public int getAndIncrement() {
-        Integer index = this.threadLocalIndex.get();
-        if (null == index) {
-            index = Math.abs(random.nextInt());
-            if (index < 0) index = 0;
-            this.threadLocalIndex.set(index);
-        }
-
-        index = Math.abs(index + 1);
-        if (index < 0)
-            index = 0;
-
-        this.threadLocalIndex.set(index);
-        return index;
-    }
-
-    @Override
-    public String toString() {
-        return "ThreadLocalIndex{" +
-                "threadLocalIndex=" + threadLocalIndex.get() +
-                '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
 
b/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
deleted file mode 100644
index 4d70167..0000000
--- 
a/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-
-
-/**
- * Strategy Algorithm for message allocating between consumers
- *
- * @author shijia.wxr
- * @author vongosling
- */
-public interface AllocateMessageQueueStrategy {
-
-    /**
-     * Allocating by consumer id
-     *
-     * @param consumerGroup
-     *         current consumer group
-     * @param currentCID
-     *         current consumer id
-     * @param mqAll
-     *         message queue set in current topic
-     * @param cidAll
-     *         consumer set in current consumer group
-     *
-     * @return The allocate result of given strategy
-     */
-    List<MessageQueue> allocate(
-            final String consumerGroup,
-            final String currentCID,
-            final List<MessageQueue> mqAll,
-            final List<String> cidAll
-    );
-
-
-    /**
-     * Algorithm name
-     *
-     * @return The strategy name
-     */
-    String getName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java
 
b/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java
deleted file mode 100644
index 96040ae..0000000
--- 
a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.client.ClientConfig;
-import com.alibaba.rocketmq.client.QueryResult;
-import 
com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
-import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * Default pulling consumer
- *
- * @author shijia.wxr
- */
-public class DefaultMQPullConsumer extends ClientConfig implements 
MQPullConsumer {
-    protected final transient DefaultMQPullConsumerImpl 
defaultMQPullConsumerImpl;
-
-    /**
-     * Do the same thing for the same Group, the application must be set,and
-     * guarantee Globally unique
-     */
-    private String consumerGroup;
-    /**
-     * Long polling mode, the Consumer connection max suspend time, it is not
-     * recommended to modify
-     */
-    private long brokerSuspendMaxTimeMillis = 1000 * 20;
-    /**
-     * Long polling mode, the Consumer connection timeout(must greater than
-     * brokerSuspendMaxTimeMillis), it is not recommended to modify
-     */
-    private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
-    /**
-     * The socket timeout in milliseconds
-     */
-    private long consumerPullTimeoutMillis = 1000 * 10;
-    /**
-     * Consumption pattern,default is clustering
-     */
-    private MessageModel messageModel = MessageModel.CLUSTERING;
-    /**
-     * Message queue listener
-     */
-    private MessageQueueListener messageQueueListener;
-    /**
-     * Offset Storage
-     */
-    private OffsetStore offsetStore;
-    /**
-     * Topic set you want to register
-     */
-    private Set<String> registerTopics = new HashSet<String>();
-    /**
-     * Queue allocation algorithm
-     */
-    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new 
AllocateMessageQueueAveragely();
-    /**
-     * Whether the unit of subscription group
-     */
-    private boolean unitMode = false;
-
-    private int maxReconsumeTimes = 16;
-
-
-    public DefaultMQPullConsumer() {
-        this(MixAll.DEFAULT_CONSUMER_GROUP, null);
-    }
-
-
-    public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
-        this.consumerGroup = consumerGroup;
-        defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, 
rpcHook);
-    }
-
-
-    public DefaultMQPullConsumer(final String consumerGroup) {
-        this(consumerGroup, null);
-    }
-
-
-    public DefaultMQPullConsumer(RPCHook rpcHook) {
-        this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
-    }
-
-    @Override
-    public void createTopic(String key, String newTopic, int queueNum) throws 
MQClientException {
-        createTopic(key, newTopic, queueNum, 0);
-    }
-
-
-    @Override
-    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag) throws MQClientException {
-        this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, 
topicSysFlag);
-    }
-
-
-    @Override
-    public long searchOffset(MessageQueue mq, long timestamp) throws 
MQClientException {
-        return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp);
-    }
-
-
-    @Override
-    public long maxOffset(MessageQueue mq) throws MQClientException {
-        return this.defaultMQPullConsumerImpl.maxOffset(mq);
-    }
-
-
-    @Override
-    public long minOffset(MessageQueue mq) throws MQClientException {
-        return this.defaultMQPullConsumerImpl.minOffset(mq);
-    }
-
-
-    @Override
-    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException 
{
-        return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq);
-    }
-
-
-    @Override
-    public MessageExt viewMessage(String offsetMsgId) throws 
RemotingException, MQBrokerException,
-            InterruptedException, MQClientException {
-        return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId);
-    }
-
-
-    @Override
-    public QueryResult queryMessage(String topic, String key, int maxNum, long 
begin, long end)
-            throws MQClientException, InterruptedException {
-        return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, 
begin, end);
-    }
-
-
-    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
-        return allocateMessageQueueStrategy;
-    }
-
-
-    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy 
allocateMessageQueueStrategy) {
-        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
-    }
-
-
-    public long getBrokerSuspendMaxTimeMillis() {
-        return brokerSuspendMaxTimeMillis;
-    }
-
-
-    public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) 
{
-        this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
-    }
-
-
-    public String getConsumerGroup() {
-        return consumerGroup;
-    }
-
-
-    public void setConsumerGroup(String consumerGroup) {
-        this.consumerGroup = consumerGroup;
-    }
-
-
-    public long getConsumerPullTimeoutMillis() {
-        return consumerPullTimeoutMillis;
-    }
-
-
-    public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) {
-        this.consumerPullTimeoutMillis = consumerPullTimeoutMillis;
-    }
-
-
-    public long getConsumerTimeoutMillisWhenSuspend() {
-        return consumerTimeoutMillisWhenSuspend;
-    }
-
-
-    public void setConsumerTimeoutMillisWhenSuspend(long 
consumerTimeoutMillisWhenSuspend) {
-        this.consumerTimeoutMillisWhenSuspend = 
consumerTimeoutMillisWhenSuspend;
-    }
-
-
-    public MessageModel getMessageModel() {
-        return messageModel;
-    }
-
-
-    public void setMessageModel(MessageModel messageModel) {
-        this.messageModel = messageModel;
-    }
-
-
-    public MessageQueueListener getMessageQueueListener() {
-        return messageQueueListener;
-    }
-
-
-    public void setMessageQueueListener(MessageQueueListener 
messageQueueListener) {
-        this.messageQueueListener = messageQueueListener;
-    }
-
-
-    public Set<String> getRegisterTopics() {
-        return registerTopics;
-    }
-
-
-    public void setRegisterTopics(Set<String> registerTopics) {
-        this.registerTopics = registerTopics;
-    }
-
-
-    @Override
-    public void sendMessageBack(MessageExt msg, int delayLevel)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null);
-    }
-
-
-    @Override
-    public void sendMessageBack(MessageExt msg, int delayLevel, String 
brokerName)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, 
brokerName);
-    }
-
-    @Override
-    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws 
MQClientException {
-        return 
this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(topic);
-    }
-
-    @Override
-    public void start() throws MQClientException {
-        this.defaultMQPullConsumerImpl.start();
-    }
-
-    @Override
-    public void shutdown() {
-        this.defaultMQPullConsumerImpl.shutdown();
-    }
-
-    @Override
-    public void registerMessageQueueListener(String topic, 
MessageQueueListener listener) {
-        synchronized (this.registerTopics) {
-            this.registerTopics.add(topic);
-            if (listener != null) {
-                this.messageQueueListener = listener;
-            }
-        }
-    }
-
-    @Override
-    public PullResult pull(MessageQueue mq, String subExpression, long offset, 
int maxNums)
-            throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
-        return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, 
maxNums);
-    }
-
-    @Override
-    public PullResult pull(MessageQueue mq, String subExpression, long offset, 
int maxNums, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
-        return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, 
maxNums, timeout);
-    }
-
-    @Override
-    public void pull(MessageQueue mq, String subExpression, long offset, int 
maxNums, PullCallback pullCallback)
-            throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, 
maxNums, pullCallback);
-    }
-
-    @Override
-    public void pull(MessageQueue mq, String subExpression, long offset, int 
maxNums, PullCallback pullCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, 
maxNums, pullCallback, timeout);
-    }
-
-    @Override
-    public PullResult pullBlockIfNotFound(MessageQueue mq, String 
subExpression, long offset, int maxNums)
-            throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
-        return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, 
subExpression, offset, maxNums);
-    }
-
-    @Override
-    public void pullBlockIfNotFound(MessageQueue mq, String subExpression, 
long offset, int maxNums, PullCallback pullCallback)
-            throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, 
offset, maxNums, pullCallback);
-    }
-
-    @Override
-    public void updateConsumeOffset(MessageQueue mq, long offset) throws 
MQClientException {
-        this.defaultMQPullConsumerImpl.updateConsumeOffset(mq, offset);
-    }
-
-    @Override
-    public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws 
MQClientException {
-        return this.defaultMQPullConsumerImpl.fetchConsumeOffset(mq, 
fromStore);
-    }
-
-    @Override
-    public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws 
MQClientException {
-        return 
this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(topic);
-    }
-
-    @Override
-    public MessageExt viewMessage(String topic, String uniqKey) throws 
RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        try {
-            MessageDecoder.decodeMessageId(uniqKey);
-            return this.viewMessage(uniqKey);
-        } catch (Exception e) {
-        }
-        return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(topic, 
uniqKey);
-    }
-
-    @Override
-    public void sendMessageBack(MessageExt msg, int delayLevel, String 
brokerName, String consumerGroup)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, 
brokerName, consumerGroup);
-    }
-
-    public OffsetStore getOffsetStore() {
-        return offsetStore;
-    }
-
-
-    public void setOffsetStore(OffsetStore offsetStore) {
-        this.offsetStore = offsetStore;
-    }
-
-
-    public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() {
-        return defaultMQPullConsumerImpl;
-    }
-
-
-    public boolean isUnitMode() {
-        return unitMode;
-    }
-
-
-    public void setUnitMode(boolean isUnitMode) {
-        this.unitMode = isUnitMode;
-    }
-
-
-    public int getMaxReconsumeTimes() {
-        return maxReconsumeTimes;
-    }
-
-
-    public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
-        this.maxReconsumeTimes = maxReconsumeTimes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java
deleted file mode 100644
index f37e982..0000000
--- 
a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.client.ClientConfig;
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
-import 
com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import 
com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
-import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-
-/**
- * Wrapped push consumer.in fact,it works as remarkable as the pull consumer
- *
- * @author shijia.wxr
- */
-public class DefaultMQPushConsumer extends ClientConfig implements 
MQPushConsumer {
-    protected final transient DefaultMQPushConsumerImpl 
defaultMQPushConsumerImpl;
-    /**
-     * Do the same thing for the same Group, the application must be set,and
-     * guarantee Globally unique
-     */
-    private String consumerGroup;
-    /**
-     * Consumption pattern,default is clustering
-     */
-    private MessageModel messageModel = MessageModel.CLUSTERING;
-    /**
-     * Consumption offset
-     */
-    private ConsumeFromWhere consumeFromWhere = 
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
-    /**
-     * Backtracking consumption time with second precision.time format is
-     * 20131223171201<br>
-     * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
-     * Default backtracking consumption time Half an hour ago
-     */
-    private String consumeTimestamp = 
UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
-    /**
-     * Queue allocation algorithm
-     */
-    private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
-
-    /**
-     * Subscription relationship
-     */
-    private Map<String /* topic */, String /* sub expression */> subscription 
= new HashMap<String, String>();
-    /**
-     * Message listener
-     */
-    private MessageListener messageListener;
-    /**
-     * Offset Storage
-     */
-    private OffsetStore offsetStore;
-    /**
-     * Minimum consumer thread number
-     */
-    private int consumeThreadMin = 20;
-    /**
-     * Max consumer thread number
-     */
-    private int consumeThreadMax = 64;
-
-    /**
-     * Threshold for dynamic adjustment of the number of thread pool
-     */
-    private long adjustThreadPoolNumsThreshold = 100000;
-
-    /**
-     * Concurrently max span offset.it has no effect on sequential consumption
-     */
-    private int consumeConcurrentlyMaxSpan = 2000;
-    /**
-     * Flow control threshold
-     */
-    private int pullThresholdForQueue = 1000;
-    /**
-     * Message pull Interval
-     */
-    private long pullInterval = 0;
-    /**
-     * Batch consumption size
-     */
-    private int consumeMessageBatchMaxSize = 1;
-    /**
-     * Batch pull size
-     */
-    private int pullBatchSize = 32;
-
-    /**
-     * Whether update subscription relationship when every pull
-     */
-    private boolean postSubscriptionWhenPull = false;
-
-    /**
-     * Whether the unit of subscription group
-     */
-    private boolean unitMode = false;
-
-    private int maxReconsumeTimes = -1;
-    private long suspendCurrentQueueTimeMillis = 1000;
-    private long consumeTimeout = 15;
-
-
-    public DefaultMQPushConsumer() {
-        this(MixAll.DEFAULT_CONSUMER_GROUP, null, new 
AllocateMessageQueueAveragely());
-    }
-
-
-    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, 
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
-        this.consumerGroup = consumerGroup;
-        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
-        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
-    }
-
-
-    public DefaultMQPushConsumer(RPCHook rpcHook) {
-        this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new 
AllocateMessageQueueAveragely());
-    }
-
-
-    public DefaultMQPushConsumer(final String consumerGroup) {
-        this(consumerGroup, null, new AllocateMessageQueueAveragely());
-    }
-
-    @Override
-    public void createTopic(String key, String newTopic, int queueNum) throws 
MQClientException {
-        createTopic(key, newTopic, queueNum, 0);
-    }
-
-
-    @Override
-    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag) throws MQClientException {
-        this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, 
topicSysFlag);
-    }
-
-
-    @Override
-    public long searchOffset(MessageQueue mq, long timestamp) throws 
MQClientException {
-        return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
-    }
-
-
-    @Override
-    public long maxOffset(MessageQueue mq) throws MQClientException {
-        return this.defaultMQPushConsumerImpl.maxOffset(mq);
-    }
-
-
-    @Override
-    public long minOffset(MessageQueue mq) throws MQClientException {
-        return this.defaultMQPushConsumerImpl.minOffset(mq);
-    }
-
-
-    @Override
-    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException 
{
-        return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq);
-    }
-
-
-    @Override
-    public MessageExt viewMessage(String offsetMsgId) throws 
RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId);
-    }
-
-
-    @Override
-    public QueryResult queryMessage(String topic, String key, int maxNum, long 
begin, long end)
-            throws MQClientException, InterruptedException {
-        return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, 
begin, end);
-    }
-
-    @Override
-    public MessageExt viewMessage(String topic, String msgId) throws 
RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        try {
-            MessageDecoder.decodeMessageId(msgId);
-            return this.viewMessage(msgId);
-        } catch (Exception e) {
-        }
-        return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, 
msgId);
-    }
-
-    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
-        return allocateMessageQueueStrategy;
-    }
-
-
-    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy 
allocateMessageQueueStrategy) {
-        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
-    }
-
-
-    public int getConsumeConcurrentlyMaxSpan() {
-        return consumeConcurrentlyMaxSpan;
-    }
-
-
-    public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) {
-        this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
-    }
-
-
-    public ConsumeFromWhere getConsumeFromWhere() {
-        return consumeFromWhere;
-    }
-
-
-    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
-        this.consumeFromWhere = consumeFromWhere;
-    }
-
-
-    public int getConsumeMessageBatchMaxSize() {
-        return consumeMessageBatchMaxSize;
-    }
-
-
-    public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
-        this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
-    }
-
-
-    public String getConsumerGroup() {
-        return consumerGroup;
-    }
-
-
-    public void setConsumerGroup(String consumerGroup) {
-        this.consumerGroup = consumerGroup;
-    }
-
-
-    public int getConsumeThreadMax() {
-        return consumeThreadMax;
-    }
-
-
-    public void setConsumeThreadMax(int consumeThreadMax) {
-        this.consumeThreadMax = consumeThreadMax;
-    }
-
-
-    public int getConsumeThreadMin() {
-        return consumeThreadMin;
-    }
-
-
-    public void setConsumeThreadMin(int consumeThreadMin) {
-        this.consumeThreadMin = consumeThreadMin;
-    }
-
-
-    public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
-        return defaultMQPushConsumerImpl;
-    }
-
-
-    public MessageListener getMessageListener() {
-        return messageListener;
-    }
-
-
-    public void setMessageListener(MessageListener messageListener) {
-        this.messageListener = messageListener;
-    }
-
-
-    public MessageModel getMessageModel() {
-        return messageModel;
-    }
-
-
-    public void setMessageModel(MessageModel messageModel) {
-        this.messageModel = messageModel;
-    }
-
-
-    public int getPullBatchSize() {
-        return pullBatchSize;
-    }
-
-
-    public void setPullBatchSize(int pullBatchSize) {
-        this.pullBatchSize = pullBatchSize;
-    }
-
-
-    public long getPullInterval() {
-        return pullInterval;
-    }
-
-
-    public void setPullInterval(long pullInterval) {
-        this.pullInterval = pullInterval;
-    }
-
-
-    public int getPullThresholdForQueue() {
-        return pullThresholdForQueue;
-    }
-
-
-    public void setPullThresholdForQueue(int pullThresholdForQueue) {
-        this.pullThresholdForQueue = pullThresholdForQueue;
-    }
-
-
-    public Map<String, String> getSubscription() {
-        return subscription;
-    }
-
-
-    public void setSubscription(Map<String, String> subscription) {
-        this.subscription = subscription;
-    }
-
-
-    @Override
-    public void sendMessageBack(MessageExt msg, int delayLevel)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
-    }
-
-
-    @Override
-    public void sendMessageBack(MessageExt msg, int delayLevel, String 
brokerName)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, 
brokerName);
-    }
-
-
-    @Override
-    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws 
MQClientException {
-        return 
this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic);
-    }
-
-
-    @Override
-    public void start() throws MQClientException {
-        this.defaultMQPushConsumerImpl.start();
-    }
-
-
-    @Override
-    public void shutdown() {
-        this.defaultMQPushConsumerImpl.shutdown();
-    }
-
-
-    @Override
-    @Deprecated
-    public void registerMessageListener(MessageListener messageListener) {
-        this.messageListener = messageListener;
-        
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
-    }
-
-
-    @Override
-    public void registerMessageListener(MessageListenerConcurrently 
messageListener) {
-        this.messageListener = messageListener;
-        
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
-    }
-
-
-    @Override
-    public void registerMessageListener(MessageListenerOrderly 
messageListener) {
-        this.messageListener = messageListener;
-        
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
-    }
-
-
-    @Override
-    public void subscribe(String topic, String subExpression) throws 
MQClientException {
-        this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
-    }
-
-
-    @Override
-    public void subscribe(String topic, String fullClassName, String 
filterClassSource) throws MQClientException {
-        this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, 
filterClassSource);
-    }
-
-
-    @Override
-    public void unsubscribe(String topic) {
-        this.defaultMQPushConsumerImpl.unsubscribe(topic);
-    }
-
-
-    @Override
-    public void updateCorePoolSize(int corePoolSize) {
-        this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize);
-    }
-
-
-    @Override
-    public void suspend() {
-        this.defaultMQPushConsumerImpl.suspend();
-    }
-
-
-    @Override
-    public void resume() {
-        this.defaultMQPushConsumerImpl.resume();
-    }
-
-
-    public OffsetStore getOffsetStore() {
-        return offsetStore;
-    }
-
-
-    public void setOffsetStore(OffsetStore offsetStore) {
-        this.offsetStore = offsetStore;
-    }
-
-
-    public String getConsumeTimestamp() {
-        return consumeTimestamp;
-    }
-
-
-    public void setConsumeTimestamp(String consumeTimestamp) {
-        this.consumeTimestamp = consumeTimestamp;
-    }
-
-
-    public boolean isPostSubscriptionWhenPull() {
-        return postSubscriptionWhenPull;
-    }
-
-
-    public void setPostSubscriptionWhenPull(boolean postSubscriptionWhenPull) {
-        this.postSubscriptionWhenPull = postSubscriptionWhenPull;
-    }
-
-
-    public boolean isUnitMode() {
-        return unitMode;
-    }
-
-
-    public void setUnitMode(boolean isUnitMode) {
-        this.unitMode = isUnitMode;
-    }
-
-
-    public long getAdjustThreadPoolNumsThreshold() {
-        return adjustThreadPoolNumsThreshold;
-    }
-
-
-    public void setAdjustThreadPoolNumsThreshold(long 
adjustThreadPoolNumsThreshold) {
-        this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
-    }
-
-
-    public int getMaxReconsumeTimes() {
-        return maxReconsumeTimes;
-    }
-
-
-    public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
-        this.maxReconsumeTimes = maxReconsumeTimes;
-    }
-
-
-    public long getSuspendCurrentQueueTimeMillis() {
-        return suspendCurrentQueueTimeMillis;
-    }
-
-
-    public void setSuspendCurrentQueueTimeMillis(final long 
suspendCurrentQueueTimeMillis) {
-        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
-    }
-
-
-    public long getConsumeTimeout() {
-        return consumeTimeout;
-    }
-
-    public void setConsumeTimeout(final long consumeTimeout) {
-        this.consumeTimeout = consumeTimeout;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java 
b/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java
deleted file mode 100644
index 2a46b65..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.client.MQAdmin;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.Set;
-
-
-/**
- * Message queue consumer interface
- *
- * @author shijia.wxr
- */
-public interface MQConsumer extends MQAdmin {
-    /**
-     * If consuming failure,message will be send back to the brokers,and delay 
consuming some time
-     *
-     * @param msg
-     * @param delayLevel
-     *
-     * @throws InterruptedException
-     * @throws MQBrokerException
-     * @throws RemotingException
-     * @throws MQClientException
-     */
-    @Deprecated
-    void sendMessageBack(final MessageExt msg, final int delayLevel) throws 
RemotingException,
-            MQBrokerException, InterruptedException, MQClientException;
-
-
-    /**
-     * If consuming failure,message will be send back to the broker,and delay 
consuming some time
-     *
-     * @param msg
-     * @param delayLevel
-     * @param brokerName
-     *
-     * @throws RemotingException
-     * @throws MQBrokerException
-     * @throws InterruptedException
-     * @throws MQClientException
-     */
-    void sendMessageBack(final MessageExt msg, final int delayLevel, final 
String brokerName)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException;
-
-
-    /**
-     * Fetch message queues from consumer cache according to the topic
-     *
-     * @param topic
-     *         message topic
-     *
-     * @return queue set
-     *
-     * @throws MQClientException
-     */
-    Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws 
MQClientException;
-}


Reply via email to