http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java
new file mode 100644
index 0000000..a2a52f0
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.filtersrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+public class RegisterFilterServerResponseHeader implements CommandCustomHeader 
{
+    @CFNotNull
+    private String brokerName;
+    @CFNotNull
+    private long brokerId;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public long getBrokerId() {
+        return brokerId;
+    }
+
+
+    public void setBrokerId(long brokerId) {
+        this.brokerId = brokerId;
+    }
+
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java
new file mode 100644
index 0000000..1fc94a9
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.filtersrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+public class RegisterMessageFilterClassRequestHeader implements 
CommandCustomHeader {
+    @CFNotNull
+    private String consumerGroup;
+    @CFNotNull
+    private String topic;
+    @CFNotNull
+    private String className;
+    @CFNotNull
+    private Integer classCRC;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getClassName() {
+        return className;
+    }
+
+
+    public void setClassName(String className) {
+        this.className = className;
+    }
+
+
+    public Integer getClassCRC() {
+        return classCRC;
+    }
+
+
+    public void setClassCRC(Integer classCRC) {
+        this.classCRC = classCRC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteKVConfigRequestHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteKVConfigRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteKVConfigRequestHeader.java
new file mode 100644
index 0000000..47ec4b1
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteKVConfigRequestHeader.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class DeleteKVConfigRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String namespace;
+    @CFNotNull
+    private String key;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+
+    public String getKey() {
+        return key;
+    }
+
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteTopicInNamesrvRequestHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteTopicInNamesrvRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteTopicInNamesrvRequestHeader.java
new file mode 100644
index 0000000..5bd0632
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteTopicInNamesrvRequestHeader.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class DeleteTopicInNamesrvRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String topic;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigRequestHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigRequestHeader.java
new file mode 100644
index 0000000..29e4db4
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigRequestHeader.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class GetKVConfigRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String namespace;
+    @CFNotNull
+    private String key;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+
+    public String getKey() {
+        return key;
+    }
+
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigResponseHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigResponseHeader.java
new file mode 100644
index 0000000..3280ff5
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigResponseHeader.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class GetKVConfigResponseHeader implements CommandCustomHeader {
+    @CFNullable
+    private String value;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getValue() {
+        return value;
+    }
+
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVListByNamespaceRequestHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVListByNamespaceRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVListByNamespaceRequestHeader.java
new file mode 100644
index 0000000..bd2816e
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVListByNamespaceRequestHeader.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class GetKVListByNamespaceRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String namespace;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
new file mode 100644
index 0000000..972cf35
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: GetRouteInfoRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class GetRouteInfoRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String topic;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java
new file mode 100644
index 0000000..8bb681e
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: GetRouteInfoResponseHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class GetRouteInfoResponseHeader implements CommandCustomHeader {
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+        // TODO Auto-generated method stub
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/PutKVConfigRequestHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/PutKVConfigRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/PutKVConfigRequestHeader.java
new file mode 100644
index 0000000..01e9a5e
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/PutKVConfigRequestHeader.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+public class PutKVConfigRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String namespace;
+    @CFNotNull
+    private String key;
+    @CFNotNull
+    private String value;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+
+    public String getKey() {
+        return key;
+    }
+
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+
+    public String getValue() {
+        return value;
+    }
+
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
new file mode 100644
index 0000000..4c0fca5
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: RegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class RegisterBrokerRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String brokerName;
+    @CFNotNull
+    private String brokerAddr;
+    @CFNotNull
+    private String clusterName;
+    @CFNotNull
+    private String haServerAddr;
+    @CFNotNull
+    private Long brokerId;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+
+    public String getHaServerAddr() {
+        return haServerAddr;
+    }
+
+
+    public void setHaServerAddr(String haServerAddr) {
+        this.haServerAddr = haServerAddr;
+    }
+
+
+    public Long getBrokerId() {
+        return brokerId;
+    }
+
+
+    public void setBrokerId(Long brokerId) {
+        this.brokerId = brokerId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerResponseHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerResponseHeader.java
new file mode 100644
index 0000000..9796054
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerResponseHeader.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class RegisterBrokerResponseHeader implements CommandCustomHeader {
+    @CFNullable
+    private String haServerAddr;
+    @CFNullable
+    private String masterAddr;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getHaServerAddr() {
+        return haServerAddr;
+    }
+
+
+    public void setHaServerAddr(String haServerAddr) {
+        this.haServerAddr = haServerAddr;
+    }
+
+
+    public String getMasterAddr() {
+        return masterAddr;
+    }
+
+
+    public void setMasterAddr(String masterAddr) {
+        this.masterAddr = masterAddr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java
new file mode 100644
index 0000000..cb5b3d9
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: RegisterOrderTopicRequestHeader.java 1835 2013-05-16 02:00:50Z 
shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RegisterOrderTopicRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String topic;
+    @CFNotNull
+    private String orderTopicString;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+        // TODO Auto-generated method stub
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getOrderTopicString() {
+        return orderTopicString;
+    }
+
+
+    public void setOrderTopicString(String orderTopicString) {
+        this.orderTopicString = orderTopicString;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java
new file mode 100644
index 0000000..f2d174a
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: UnRegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z 
shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class UnRegisterBrokerRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String brokerName;
+    @CFNotNull
+    private String brokerAddr;
+    @CFNotNull
+    private String clusterName;
+    @CFNotNull
+    private Long brokerId;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+
+    public Long getBrokerId() {
+        return brokerId;
+    }
+
+
+    public void setBrokerId(Long brokerId) {
+        this.brokerId = brokerId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerRequestHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerRequestHeader.java
new file mode 100644
index 0000000..f5aebb9
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerRequestHeader.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class WipeWritePermOfBrokerRequestHeader implements CommandCustomHeader 
{
+    @CFNotNull
+    private String brokerName;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerResponseHeader.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerResponseHeader.java
new file mode 100644
index 0000000..e50641b
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerResponseHeader.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class WipeWritePermOfBrokerResponseHeader implements 
CommandCustomHeader {
+    @CFNotNull
+    private Integer wipeTopicCount;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public Integer getWipeTopicCount() {
+        return wipeTopicCount;
+    }
+
+
+    public void setWipeTopicCount(Integer wipeTopicCount) {
+        this.wipeTopicCount = wipeTopicCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java
new file mode 100644
index 0000000..115a885
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: ConsumeType.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.heartbeat;
+
+/**
+ * @author shijia.wxr
+ */
+public enum ConsumeType {
+
+    CONSUME_ACTIVELY("PULL"),
+
+    CONSUME_PASSIVELY("PUSH");
+
+    private String typeCN;
+
+    ConsumeType(String typeCN) {
+        this.typeCN = typeCN;
+    }
+
+
+    public String getTypeCN() {
+        return typeCN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java
new file mode 100644
index 0000000..233da6c
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: ConsumerData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.heartbeat;
+
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerData {
+    private String groupName;
+    private ConsumeType consumeType;
+    private MessageModel messageModel;
+    private ConsumeFromWhere consumeFromWhere;
+    private Set<SubscriptionData> subscriptionDataSet = new 
HashSet<SubscriptionData>();
+    private boolean unitMode;
+
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+
+    public ConsumeType getConsumeType() {
+        return consumeType;
+    }
+
+
+    public void setConsumeType(ConsumeType consumeType) {
+        this.consumeType = consumeType;
+    }
+
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+
+    public ConsumeFromWhere getConsumeFromWhere() {
+        return consumeFromWhere;
+    }
+
+
+    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+        this.consumeFromWhere = consumeFromWhere;
+    }
+
+
+    public Set<SubscriptionData> getSubscriptionDataSet() {
+        return subscriptionDataSet;
+    }
+
+
+    public void setSubscriptionDataSet(Set<SubscriptionData> 
subscriptionDataSet) {
+        this.subscriptionDataSet = subscriptionDataSet;
+    }
+
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+
+    public void setUnitMode(boolean isUnitMode) {
+        this.unitMode = isUnitMode;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ConsumerData [groupName=" + groupName + ", consumeType=" + 
consumeType + ", messageModel="
+                + messageModel + ", consumeFromWhere=" + consumeFromWhere + ", 
unitMode=" + unitMode
+                + ", subscriptionDataSet=" + subscriptionDataSet + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
new file mode 100644
index 0000000..8fa5b17
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: HeartbeatData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.heartbeat;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class HeartbeatData extends RemotingSerializable {
+    private String clientID;
+    private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
+    private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
+
+
+    public String getClientID() {
+        return clientID;
+    }
+
+
+    public void setClientID(String clientID) {
+        this.clientID = clientID;
+    }
+
+
+    public Set<ProducerData> getProducerDataSet() {
+        return producerDataSet;
+    }
+
+
+    public void setProducerDataSet(Set<ProducerData> producerDataSet) {
+        this.producerDataSet = producerDataSet;
+    }
+
+
+    public Set<ConsumerData> getConsumerDataSet() {
+        return consumerDataSet;
+    }
+
+
+    public void setConsumerDataSet(Set<ConsumerData> consumerDataSet) {
+        this.consumerDataSet = consumerDataSet;
+    }
+
+
+    @Override
+    public String toString() {
+        return "HeartbeatData [clientID=" + clientID + ", producerDataSet=" + 
producerDataSet
+                + ", consumerDataSet=" + consumerDataSet + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java
new file mode 100644
index 0000000..4600c6f
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: MessageModel.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.heartbeat;
+
+/**
+ * Message model
+ *
+ * @author shijia.wxr
+ */
+public enum MessageModel {
+    /**
+     * broadcast
+     */
+    BROADCASTING("BROADCASTING"),
+    /**
+     * clustering
+     */
+    CLUSTERING("CLUSTERING");
+
+    private String modeCN;
+
+    MessageModel(String modeCN) {
+        this.modeCN = modeCN;
+    }
+
+
+    public String getModeCN() {
+        return modeCN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java
new file mode 100644
index 0000000..c83b14c
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: ProducerData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.heartbeat;
+
+/**
+ * @author shijia.wxr
+ */
+public class ProducerData {
+    private String groupName;
+
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ProducerData [groupName=" + groupName + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
new file mode 100644
index 0000000..28b49f1
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: SubscriptionData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.heartbeat;
+
+import com.alibaba.fastjson.annotation.JSONField;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SubscriptionData implements Comparable<SubscriptionData> {
+    public final static String SUB_ALL = "*";
+    private boolean classFilterMode = false;
+    private String topic;
+    private String subString;
+    private Set<String> tagsSet = new HashSet<String>();
+    private Set<Integer> codeSet = new HashSet<Integer>();
+    private long subVersion = System.currentTimeMillis();
+
+    @JSONField(serialize = false)
+    private String filterClassSource;
+
+
+    public SubscriptionData() {
+
+    }
+
+
+    public SubscriptionData(String topic, String subString) {
+        super();
+        this.topic = topic;
+        this.subString = subString;
+    }
+
+    public String getFilterClassSource() {
+        return filterClassSource;
+    }
+
+    public void setFilterClassSource(String filterClassSource) {
+        this.filterClassSource = filterClassSource;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getSubString() {
+        return subString;
+    }
+
+
+    public void setSubString(String subString) {
+        this.subString = subString;
+    }
+
+
+    public Set<String> getTagsSet() {
+        return tagsSet;
+    }
+
+
+    public void setTagsSet(Set<String> tagsSet) {
+        this.tagsSet = tagsSet;
+    }
+
+
+    public long getSubVersion() {
+        return subVersion;
+    }
+
+
+    public void setSubVersion(long subVersion) {
+        this.subVersion = subVersion;
+    }
+
+
+    public Set<Integer> getCodeSet() {
+        return codeSet;
+    }
+
+
+    public void setCodeSet(Set<Integer> codeSet) {
+        this.codeSet = codeSet;
+    }
+
+
+    public boolean isClassFilterMode() {
+        return classFilterMode;
+    }
+
+
+    public void setClassFilterMode(boolean classFilterMode) {
+        this.classFilterMode = classFilterMode;
+    }
+
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (classFilterMode ? 1231 : 1237);
+        result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode());
+        result = prime * result + ((subString == null) ? 0 : 
subString.hashCode());
+        result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode());
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        return result;
+    }
+
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        SubscriptionData other = (SubscriptionData) obj;
+        if (classFilterMode != other.classFilterMode)
+            return false;
+        if (codeSet == null) {
+            if (other.codeSet != null)
+                return false;
+        } else if (!codeSet.equals(other.codeSet))
+            return false;
+        if (subString == null) {
+            if (other.subString != null)
+                return false;
+        } else if (!subString.equals(other.subString))
+            return false;
+        if (subVersion != other.subVersion)
+            return false;
+        if (tagsSet == null) {
+            if (other.tagsSet != null)
+                return false;
+        } else if (!tagsSet.equals(other.tagsSet))
+            return false;
+        if (topic == null) {
+            if (other.topic != null)
+                return false;
+        } else if (!topic.equals(other.topic))
+            return false;
+        return true;
+    }
+
+
+    @Override
+    public String toString() {
+        return "SubscriptionData [classFilterMode=" + classFilterMode + ", 
topic=" + topic + ", subString="
+                + subString + ", tagsSet=" + tagsSet + ", codeSet=" + codeSet 
+ ", subVersion=" + subVersion
+                + "]";
+    }
+
+
+    @Override
+    public int compareTo(SubscriptionData other) {
+        String thisValue = this.topic + "@" + this.subString;
+        String otherValue = other.topic + "@" + other.subString;
+        return thisValue.compareTo(otherValue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
new file mode 100644
index 0000000..1696cd6
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: BrokerData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.route;
+
+import org.apache.rocketmq.common.MixAll;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class BrokerData implements Comparable<BrokerData> {
+    private String cluster;
+    private String brokerName;
+    private HashMap<Long/* brokerId */, String/* broker address */> 
brokerAddrs;
+
+    public String selectBrokerAddr() {
+        String value = this.brokerAddrs.get(MixAll.MASTER_ID);
+        if (null == value) {
+            for (Map.Entry<Long, String> entry : this.brokerAddrs.entrySet()) {
+                return entry.getValue();
+            }
+        }
+
+        return value;
+    }
+
+    public HashMap<Long, String> getBrokerAddrs() {
+        return brokerAddrs;
+    }
+
+    public void setBrokerAddrs(HashMap<Long, String> brokerAddrs) {
+        this.brokerAddrs = brokerAddrs;
+    }
+
+    public String getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(String cluster) {
+        this.cluster = cluster;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((brokerAddrs == null) ? 0 : 
brokerAddrs.hashCode());
+        result = prime * result + ((brokerName == null) ? 0 : 
brokerName.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        BrokerData other = (BrokerData) obj;
+        if (brokerAddrs == null) {
+            if (other.brokerAddrs != null)
+                return false;
+        } else if (!brokerAddrs.equals(other.brokerAddrs))
+            return false;
+        if (brokerName == null) {
+            if (other.brokerName != null)
+                return false;
+        } else if (!brokerName.equals(other.brokerName))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "BrokerData [brokerName=" + brokerName + ", brokerAddrs=" + 
brokerAddrs + "]";
+    }
+
+    @Override
+    public int compareTo(BrokerData o) {
+        return this.brokerName.compareTo(o.getBrokerName());
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java
new file mode 100644
index 0000000..de736be
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: QueueData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.route;
+
+public class QueueData implements Comparable<QueueData> {
+    private String brokerName;
+    private int readQueueNums;
+    private int writeQueueNums;
+    private int perm;
+    private int topicSynFlag;
+
+    public int getReadQueueNums() {
+        return readQueueNums;
+    }
+
+    public void setReadQueueNums(int readQueueNums) {
+        this.readQueueNums = readQueueNums;
+    }
+
+    public int getWriteQueueNums() {
+        return writeQueueNums;
+    }
+
+    public void setWriteQueueNums(int writeQueueNums) {
+        this.writeQueueNums = writeQueueNums;
+    }
+
+    public int getPerm() {
+        return perm;
+    }
+
+    public void setPerm(int perm) {
+        this.perm = perm;
+    }
+
+    public int getTopicSynFlag() {
+        return topicSynFlag;
+    }
+
+    public void setTopicSynFlag(int topicSynFlag) {
+        this.topicSynFlag = topicSynFlag;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((brokerName == null) ? 0 : 
brokerName.hashCode());
+        result = prime * result + perm;
+        result = prime * result + readQueueNums;
+        result = prime * result + writeQueueNums;
+        result = prime * result + topicSynFlag;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        QueueData other = (QueueData) obj;
+        if (brokerName == null) {
+            if (other.brokerName != null)
+                return false;
+        } else if (!brokerName.equals(other.brokerName))
+            return false;
+        if (perm != other.perm)
+            return false;
+        if (readQueueNums != other.readQueueNums)
+            return false;
+        if (writeQueueNums != other.writeQueueNums)
+            return false;
+        if (topicSynFlag != other.topicSynFlag)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "QueueData [brokerName=" + brokerName + ", readQueueNums=" + 
readQueueNums
+                + ", writeQueueNums=" + writeQueueNums + ", perm=" + perm + ", 
topicSynFlag=" + topicSynFlag
+                + "]";
+    }
+
+    @Override
+    public int compareTo(QueueData o) {
+        return this.brokerName.compareTo(o.getBrokerName());
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
new file mode 100644
index 0000000..13c5273
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: TopicRouteData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.route;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicRouteData extends RemotingSerializable {
+    private String orderTopicConf;
+    private List<QueueData> queueDatas;
+    private List<BrokerData> brokerDatas;
+    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> 
filterServerTable;
+
+
+    public TopicRouteData cloneTopicRouteData() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+        topicRouteData.setQueueDatas(new ArrayList<QueueData>());
+        topicRouteData.setBrokerDatas(new ArrayList<BrokerData>());
+        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
+        topicRouteData.setOrderTopicConf(this.orderTopicConf);
+
+        if (this.queueDatas != null) {
+            topicRouteData.getQueueDatas().addAll(this.queueDatas);
+        }
+
+        if (this.brokerDatas != null) {
+            topicRouteData.getBrokerDatas().addAll(this.brokerDatas);
+        }
+
+        if (this.filterServerTable != null) {
+            
topicRouteData.getFilterServerTable().putAll(this.filterServerTable);
+        }
+
+        return topicRouteData;
+    }
+
+
+    public List<QueueData> getQueueDatas() {
+        return queueDatas;
+    }
+
+
+    public void setQueueDatas(List<QueueData> queueDatas) {
+        this.queueDatas = queueDatas;
+    }
+
+
+    public List<BrokerData> getBrokerDatas() {
+        return brokerDatas;
+    }
+
+
+    public void setBrokerDatas(List<BrokerData> brokerDatas) {
+        this.brokerDatas = brokerDatas;
+    }
+
+    public HashMap<String, List<String>> getFilterServerTable() {
+        return filterServerTable;
+    }
+
+    public void setFilterServerTable(HashMap<String, List<String>> 
filterServerTable) {
+        this.filterServerTable = filterServerTable;
+    }
+
+    public String getOrderTopicConf() {
+        return orderTopicConf;
+    }
+
+    public void setOrderTopicConf(String orderTopicConf) {
+        this.orderTopicConf = orderTopicConf;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((brokerDatas == null) ? 0 : 
brokerDatas.hashCode());
+        result = prime * result + ((orderTopicConf == null) ? 0 : 
orderTopicConf.hashCode());
+        result = prime * result + ((queueDatas == null) ? 0 : 
queueDatas.hashCode());
+        result = prime * result + ((filterServerTable == null) ? 0 : 
filterServerTable.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        TopicRouteData other = (TopicRouteData) obj;
+        if (brokerDatas == null) {
+            if (other.brokerDatas != null)
+                return false;
+        } else if (!brokerDatas.equals(other.brokerDatas))
+            return false;
+        if (orderTopicConf == null) {
+            if (other.orderTopicConf != null)
+                return false;
+        } else if (!orderTopicConf.equals(other.orderTopicConf))
+            return false;
+        if (queueDatas == null) {
+            if (other.queueDatas != null)
+                return false;
+        } else if (!queueDatas.equals(other.queueDatas))
+            return false;
+        if (filterServerTable == null) {
+            if (other.filterServerTable != null)
+                return false;
+        } else if (!filterServerTable.equals(other.filterServerTable))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "TopicRouteData [orderTopicConf=" + orderTopicConf + ", 
queueDatas=" + queueDatas
+                + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + 
filterServerTable + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java
new file mode 100644
index 0000000..df5ec71
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.topic;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+
+public class OffsetMovedEvent extends RemotingSerializable {
+    private String consumerGroup;
+    private MessageQueue messageQueue;
+    private long offsetRequest;
+    private long offsetNew;
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+
+    public void setMessageQueue(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+
+    public long getOffsetRequest() {
+        return offsetRequest;
+    }
+
+
+    public void setOffsetRequest(long offsetRequest) {
+        this.offsetRequest = offsetRequest;
+    }
+
+
+    public long getOffsetNew() {
+        return offsetNew;
+    }
+
+
+    public void setOffsetNew(long offsetNew) {
+        this.offsetNew = offsetNew;
+    }
+
+
+    @Override
+    public String toString() {
+        return "OffsetMovedEvent [consumerGroup=" + consumerGroup + ", 
messageQueue=" + messageQueue
+                + ", offsetRequest=" + offsetRequest + ", offsetNew=" + 
offsetNew + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java 
b/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java
new file mode 100644
index 0000000..7036fdd
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.queue;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * thread safe
+ *
+ * @author lansheng.zj
+ */
+public class ConcurrentTreeMap<K, V> {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final ReentrantLock lock;
+    private TreeMap<K, V> tree;
+    private RoundQueue<K> roundQueue;
+
+
+    public ConcurrentTreeMap(int capacity, Comparator<? super K> comparator) {
+        tree = new TreeMap<K, V>(comparator);
+        roundQueue = new RoundQueue<K>(capacity);
+        lock = new ReentrantLock(true);
+    }
+
+
+    public Map.Entry<K, V> pollFirstEntry() {
+        lock.lock();
+        try {
+            return tree.pollFirstEntry();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public V putIfAbsentAndRetExsit(K key, V value) {
+        lock.lock();
+        try {
+            if (roundQueue.put(key)) {
+                V exsit = tree.get(key);
+                if (null == exsit) {
+                    tree.put(key, value);
+                    exsit = value;
+                }
+                log.warn("putIfAbsentAndRetExsit success. {}", key);
+                return exsit;
+            }
+
+            else {
+                V exsit = tree.get(key);
+                return exsit;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java 
b/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java
new file mode 100644
index 0000000..f32569b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.queue;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+
+/**
+ * not thread safe
+ *
+ * @author lansheng.zj
+ */
+public class RoundQueue<E> {
+
+    private Queue<E> queue;
+    private int capacity;
+
+
+    public RoundQueue(int capacity) {
+        this.capacity = capacity;
+        queue = new LinkedList<E>();
+    }
+
+
+    public boolean put(E e) {
+        boolean ok = false;
+        if (!queue.contains(e)) {
+            if (queue.size() >= capacity) {
+                queue.poll();
+            }
+            queue.add(e);
+            ok = true;
+        }
+
+        return ok;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/running/RunningStats.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/running/RunningStats.java 
b/common/src/main/java/org/apache/rocketmq/common/running/RunningStats.java
new file mode 100644
index 0000000..106d111
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/running/RunningStats.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.common.running;
+
+public enum RunningStats {
+    commitLogMaxOffset,
+    commitLogMinOffset,
+    commitLogDiskRatio,
+    consumeQueueDiskRatio,
+    scheduleMessageOffset,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java 
b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
new file mode 100644
index 0000000..58eedb2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.stats;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class MomentStatsItem {
+
+    private final AtomicLong value = new AtomicLong(0);
+
+    private final String statsName;
+    private final String statsKey;
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final Logger log;
+
+
+    public MomentStatsItem(String statsName, String statsKey,
+                           ScheduledExecutorService scheduledExecutorService, 
Logger log) {
+        this.statsName = statsName;
+        this.statsKey = statsKey;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.log = log;
+    }
+
+
+    public void init() {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtMinutes();
+
+                    MomentStatsItem.this.value.set(0);
+                } catch (Throwable e) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMinutesTimeMillis() - 
System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
+    }
+
+
+    public void printAtMinutes() {
+        log.info(String.format("[%s] [%s] Stats Every 5 Minutes, Value: %d",
+                this.statsName,
+                this.statsKey,
+                this.value.get()));
+    }
+
+    public AtomicLong getValue() {
+        return value;
+    }
+
+
+    public String getStatsKey() {
+        return statsKey;
+    }
+
+
+    public String getStatsName() {
+        return statsName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java 
b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
new file mode 100644
index 0000000..aba61c7
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.stats;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class MomentStatsItemSet {
+    private final ConcurrentHashMap<String/* key */, MomentStatsItem> 
statsItemTable =
+            new ConcurrentHashMap<String, MomentStatsItem>(128);
+    private final String statsName;
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final Logger log;
+
+
+    public MomentStatsItemSet(String statsName, ScheduledExecutorService 
scheduledExecutorService, Logger log) {
+        this.statsName = statsName;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.log = log;
+        this.init();
+    }
+
+    public ConcurrentHashMap<String, MomentStatsItem> getStatsItemTable() {
+        return statsItemTable;
+    }
+
+    public String getStatsName() {
+        return statsName;
+    }
+
+    public void init() {
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtMinutes();
+                } catch (Throwable e) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMinutesTimeMillis() - 
System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
+    }
+
+    private void printAtMinutes() {
+        Iterator<Entry<String, MomentStatsItem>> it = 
this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, MomentStatsItem> next = it.next();
+            next.getValue().printAtMinutes();
+        }
+    }
+
+    public void setValue(final String statsKey, final int value) {
+        MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey);
+        statsItem.getValue().set(value);
+    }
+
+    public MomentStatsItem getAndCreateStatsItem(final String statsKey) {
+        MomentStatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null == statsItem) {
+            statsItem =
+                    new MomentStatsItem(this.statsName, statsKey, 
this.scheduledExecutorService, this.log);
+            MomentStatsItem prev = this.statsItemTable.put(statsKey, 
statsItem);
+
+            if (null == prev) {
+
+                // statsItem.init();
+            }
+        }
+
+        return statsItem;
+    }
+}

Reply via email to