http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java new file mode 100644 index 0000000..a2a52f0 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.filtersrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +public class RegisterFilterServerResponseHeader implements CommandCustomHeader { + @CFNotNull + private String brokerName; + @CFNotNull + private long brokerId; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public long getBrokerId() { + return brokerId; + } + + + public void setBrokerId(long brokerId) { + this.brokerId = brokerId; + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java new file mode 100644 index 0000000..1fc94a9 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.filtersrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +public class RegisterMessageFilterClassRequestHeader implements CommandCustomHeader { + @CFNotNull + private String consumerGroup; + @CFNotNull + private String topic; + @CFNotNull + private String className; + @CFNotNull + private Integer classCRC; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getClassName() { + return className; + } + + + public void setClassName(String className) { + this.className = className; + } + + + public Integer getClassCRC() { + return classCRC; + } + + + public void setClassCRC(Integer classCRC) { + this.classCRC = classCRC; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteKVConfigRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteKVConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteKVConfigRequestHeader.java new file mode 100644 index 0000000..47ec4b1 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteKVConfigRequestHeader.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + * + */ +public class DeleteKVConfigRequestHeader implements CommandCustomHeader { + @CFNotNull + private String namespace; + @CFNotNull + private String key; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getNamespace() { + return namespace; + } + + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + + public String getKey() { + return key; + } + + + public void setKey(String key) { + this.key = key; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteTopicInNamesrvRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteTopicInNamesrvRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteTopicInNamesrvRequestHeader.java new file mode 100644 index 0000000..5bd0632 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/DeleteTopicInNamesrvRequestHeader.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + * + */ +public class DeleteTopicInNamesrvRequestHeader implements CommandCustomHeader { + @CFNotNull + private String topic; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigRequestHeader.java new file mode 100644 index 0000000..29e4db4 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigRequestHeader.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + * + */ +public class GetKVConfigRequestHeader implements CommandCustomHeader { + @CFNotNull + private String namespace; + @CFNotNull + private String key; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getNamespace() { + return namespace; + } + + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + + public String getKey() { + return key; + } + + + public void setKey(String key) { + this.key = key; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigResponseHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigResponseHeader.java new file mode 100644 index 0000000..3280ff5 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVConfigResponseHeader.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNullable; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + * + */ +public class GetKVConfigResponseHeader implements CommandCustomHeader { + @CFNullable + private String value; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getValue() { + return value; + } + + + public void setValue(String value) { + this.value = value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVListByNamespaceRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVListByNamespaceRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVListByNamespaceRequestHeader.java new file mode 100644 index 0000000..bd2816e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetKVListByNamespaceRequestHeader.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + * + */ +public class GetKVListByNamespaceRequestHeader implements CommandCustomHeader { + @CFNotNull + private String namespace; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getNamespace() { + return namespace; + } + + + public void setNamespace(String namespace) { + this.namespace = namespace; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java new file mode 100644 index 0000000..972cf35 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: GetRouteInfoRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + */ +public class GetRouteInfoRequestHeader implements CommandCustomHeader { + @CFNotNull + private String topic; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java new file mode 100644 index 0000000..8bb681e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: GetRouteInfoResponseHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + */ +public class GetRouteInfoResponseHeader implements CommandCustomHeader { + + @Override + public void checkFields() throws RemotingCommandException { + // TODO Auto-generated method stub + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/PutKVConfigRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/PutKVConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/PutKVConfigRequestHeader.java new file mode 100644 index 0000000..01e9a5e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/PutKVConfigRequestHeader.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +public class PutKVConfigRequestHeader implements CommandCustomHeader { + @CFNotNull + private String namespace; + @CFNotNull + private String key; + @CFNotNull + private String value; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getNamespace() { + return namespace; + } + + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + + public String getKey() { + return key; + } + + + public void setKey(String key) { + this.key = key; + } + + + public String getValue() { + return value; + } + + + public void setValue(String value) { + this.value = value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java new file mode 100644 index 0000000..4c0fca5 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: RegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author lansheng.zj + */ +public class RegisterBrokerRequestHeader implements CommandCustomHeader { + @CFNotNull + private String brokerName; + @CFNotNull + private String brokerAddr; + @CFNotNull + private String clusterName; + @CFNotNull + private String haServerAddr; + @CFNotNull + private Long brokerId; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public String getBrokerAddr() { + return brokerAddr; + } + + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + + public String getClusterName() { + return clusterName; + } + + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + + public String getHaServerAddr() { + return haServerAddr; + } + + + public void setHaServerAddr(String haServerAddr) { + this.haServerAddr = haServerAddr; + } + + + public Long getBrokerId() { + return brokerId; + } + + + public void setBrokerId(Long brokerId) { + this.brokerId = brokerId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerResponseHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerResponseHeader.java new file mode 100644 index 0000000..9796054 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerResponseHeader.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNullable; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + * + */ +public class RegisterBrokerResponseHeader implements CommandCustomHeader { + @CFNullable + private String haServerAddr; + @CFNullable + private String masterAddr; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getHaServerAddr() { + return haServerAddr; + } + + + public void setHaServerAddr(String haServerAddr) { + this.haServerAddr = haServerAddr; + } + + + public String getMasterAddr() { + return masterAddr; + } + + + public void setMasterAddr(String masterAddr) { + this.masterAddr = masterAddr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java new file mode 100644 index 0000000..cb5b3d9 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: RegisterOrderTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + */ +public class RegisterOrderTopicRequestHeader implements CommandCustomHeader { + @CFNotNull + private String topic; + @CFNotNull + private String orderTopicString; + + + @Override + public void checkFields() throws RemotingCommandException { + // TODO Auto-generated method stub + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getOrderTopicString() { + return orderTopicString; + } + + + public void setOrderTopicString(String orderTopicString) { + this.orderTopicString = orderTopicString; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java new file mode 100644 index 0000000..f2d174a --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: UnRegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author lansheng.zj + */ +public class UnRegisterBrokerRequestHeader implements CommandCustomHeader { + @CFNotNull + private String brokerName; + @CFNotNull + private String brokerAddr; + @CFNotNull + private String clusterName; + @CFNotNull + private Long brokerId; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public String getBrokerAddr() { + return brokerAddr; + } + + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + + public String getClusterName() { + return clusterName; + } + + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + + public Long getBrokerId() { + return brokerId; + } + + + public void setBrokerId(Long brokerId) { + this.brokerId = brokerId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerRequestHeader.java new file mode 100644 index 0000000..f5aebb9 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerRequestHeader.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + * + */ +public class WipeWritePermOfBrokerRequestHeader implements CommandCustomHeader { + @CFNotNull + private String brokerName; + + + @Override + public void checkFields() throws RemotingCommandException { + + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerResponseHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerResponseHeader.java new file mode 100644 index 0000000..e50641b --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/WipeWritePermOfBrokerResponseHeader.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + * + */ +public class WipeWritePermOfBrokerResponseHeader implements CommandCustomHeader { + @CFNotNull + private Integer wipeTopicCount; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public Integer getWipeTopicCount() { + return wipeTopicCount; + } + + + public void setWipeTopicCount(Integer wipeTopicCount) { + this.wipeTopicCount = wipeTopicCount; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java new file mode 100644 index 0000000..115a885 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: ConsumeType.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.heartbeat; + +/** + * @author shijia.wxr + */ +public enum ConsumeType { + + CONSUME_ACTIVELY("PULL"), + + CONSUME_PASSIVELY("PUSH"); + + private String typeCN; + + ConsumeType(String typeCN) { + this.typeCN = typeCN; + } + + + public String getTypeCN() { + return typeCN; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java new file mode 100644 index 0000000..233da6c --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: ConsumerData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.heartbeat; + +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; + +import java.util.HashSet; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class ConsumerData { + private String groupName; + private ConsumeType consumeType; + private MessageModel messageModel; + private ConsumeFromWhere consumeFromWhere; + private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>(); + private boolean unitMode; + + + public String getGroupName() { + return groupName; + } + + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + + public ConsumeType getConsumeType() { + return consumeType; + } + + + public void setConsumeType(ConsumeType consumeType) { + this.consumeType = consumeType; + } + + + public MessageModel getMessageModel() { + return messageModel; + } + + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + + public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { + this.consumeFromWhere = consumeFromWhere; + } + + + public Set<SubscriptionData> getSubscriptionDataSet() { + return subscriptionDataSet; + } + + + public void setSubscriptionDataSet(Set<SubscriptionData> subscriptionDataSet) { + this.subscriptionDataSet = subscriptionDataSet; + } + + + public boolean isUnitMode() { + return unitMode; + } + + + public void setUnitMode(boolean isUnitMode) { + this.unitMode = isUnitMode; + } + + + @Override + public String toString() { + return "ConsumerData [groupName=" + groupName + ", consumeType=" + consumeType + ", messageModel=" + + messageModel + ", consumeFromWhere=" + consumeFromWhere + ", unitMode=" + unitMode + + ", subscriptionDataSet=" + subscriptionDataSet + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java new file mode 100644 index 0000000..8fa5b17 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: HeartbeatData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.heartbeat; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class HeartbeatData extends RemotingSerializable { + private String clientID; + private Set<ProducerData> producerDataSet = new HashSet<ProducerData>(); + private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>(); + + + public String getClientID() { + return clientID; + } + + + public void setClientID(String clientID) { + this.clientID = clientID; + } + + + public Set<ProducerData> getProducerDataSet() { + return producerDataSet; + } + + + public void setProducerDataSet(Set<ProducerData> producerDataSet) { + this.producerDataSet = producerDataSet; + } + + + public Set<ConsumerData> getConsumerDataSet() { + return consumerDataSet; + } + + + public void setConsumerDataSet(Set<ConsumerData> consumerDataSet) { + this.consumerDataSet = consumerDataSet; + } + + + @Override + public String toString() { + return "HeartbeatData [clientID=" + clientID + ", producerDataSet=" + producerDataSet + + ", consumerDataSet=" + consumerDataSet + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java new file mode 100644 index 0000000..4600c6f --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: MessageModel.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.heartbeat; + +/** + * Message model + * + * @author shijia.wxr + */ +public enum MessageModel { + /** + * broadcast + */ + BROADCASTING("BROADCASTING"), + /** + * clustering + */ + CLUSTERING("CLUSTERING"); + + private String modeCN; + + MessageModel(String modeCN) { + this.modeCN = modeCN; + } + + + public String getModeCN() { + return modeCN; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java new file mode 100644 index 0000000..c83b14c --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: ProducerData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.heartbeat; + +/** + * @author shijia.wxr + */ +public class ProducerData { + private String groupName; + + + public String getGroupName() { + return groupName; + } + + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + + @Override + public String toString() { + return "ProducerData [groupName=" + groupName + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java new file mode 100644 index 0000000..28b49f1 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: SubscriptionData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.heartbeat; + +import com.alibaba.fastjson.annotation.JSONField; + +import java.util.HashSet; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class SubscriptionData implements Comparable<SubscriptionData> { + public final static String SUB_ALL = "*"; + private boolean classFilterMode = false; + private String topic; + private String subString; + private Set<String> tagsSet = new HashSet<String>(); + private Set<Integer> codeSet = new HashSet<Integer>(); + private long subVersion = System.currentTimeMillis(); + + @JSONField(serialize = false) + private String filterClassSource; + + + public SubscriptionData() { + + } + + + public SubscriptionData(String topic, String subString) { + super(); + this.topic = topic; + this.subString = subString; + } + + public String getFilterClassSource() { + return filterClassSource; + } + + public void setFilterClassSource(String filterClassSource) { + this.filterClassSource = filterClassSource; + } + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getSubString() { + return subString; + } + + + public void setSubString(String subString) { + this.subString = subString; + } + + + public Set<String> getTagsSet() { + return tagsSet; + } + + + public void setTagsSet(Set<String> tagsSet) { + this.tagsSet = tagsSet; + } + + + public long getSubVersion() { + return subVersion; + } + + + public void setSubVersion(long subVersion) { + this.subVersion = subVersion; + } + + + public Set<Integer> getCodeSet() { + return codeSet; + } + + + public void setCodeSet(Set<Integer> codeSet) { + this.codeSet = codeSet; + } + + + public boolean isClassFilterMode() { + return classFilterMode; + } + + + public void setClassFilterMode(boolean classFilterMode) { + this.classFilterMode = classFilterMode; + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (classFilterMode ? 1231 : 1237); + result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode()); + result = prime * result + ((subString == null) ? 0 : subString.hashCode()); + result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode()); + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SubscriptionData other = (SubscriptionData) obj; + if (classFilterMode != other.classFilterMode) + return false; + if (codeSet == null) { + if (other.codeSet != null) + return false; + } else if (!codeSet.equals(other.codeSet)) + return false; + if (subString == null) { + if (other.subString != null) + return false; + } else if (!subString.equals(other.subString)) + return false; + if (subVersion != other.subVersion) + return false; + if (tagsSet == null) { + if (other.tagsSet != null) + return false; + } else if (!tagsSet.equals(other.tagsSet)) + return false; + if (topic == null) { + if (other.topic != null) + return false; + } else if (!topic.equals(other.topic)) + return false; + return true; + } + + + @Override + public String toString() { + return "SubscriptionData [classFilterMode=" + classFilterMode + ", topic=" + topic + ", subString=" + + subString + ", tagsSet=" + tagsSet + ", codeSet=" + codeSet + ", subVersion=" + subVersion + + "]"; + } + + + @Override + public int compareTo(SubscriptionData other) { + String thisValue = this.topic + "@" + this.subString; + String otherValue = other.topic + "@" + other.subString; + return thisValue.compareTo(otherValue); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java new file mode 100644 index 0000000..1696cd6 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: BrokerData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.route; + +import org.apache.rocketmq.common.MixAll; + +import java.util.HashMap; +import java.util.Map; + + +/** + * @author shijia.wxr + * + */ +public class BrokerData implements Comparable<BrokerData> { + private String cluster; + private String brokerName; + private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; + + public String selectBrokerAddr() { + String value = this.brokerAddrs.get(MixAll.MASTER_ID); + if (null == value) { + for (Map.Entry<Long, String> entry : this.brokerAddrs.entrySet()) { + return entry.getValue(); + } + } + + return value; + } + + public HashMap<Long, String> getBrokerAddrs() { + return brokerAddrs; + } + + public void setBrokerAddrs(HashMap<Long, String> brokerAddrs) { + this.brokerAddrs = brokerAddrs; + } + + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((brokerAddrs == null) ? 0 : brokerAddrs.hashCode()); + result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + BrokerData other = (BrokerData) obj; + if (brokerAddrs == null) { + if (other.brokerAddrs != null) + return false; + } else if (!brokerAddrs.equals(other.brokerAddrs)) + return false; + if (brokerName == null) { + if (other.brokerName != null) + return false; + } else if (!brokerName.equals(other.brokerName)) + return false; + return true; + } + + @Override + public String toString() { + return "BrokerData [brokerName=" + brokerName + ", brokerAddrs=" + brokerAddrs + "]"; + } + + @Override + public int compareTo(BrokerData o) { + return this.brokerName.compareTo(o.getBrokerName()); + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java new file mode 100644 index 0000000..de736be --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: QueueData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.route; + +public class QueueData implements Comparable<QueueData> { + private String brokerName; + private int readQueueNums; + private int writeQueueNums; + private int perm; + private int topicSynFlag; + + public int getReadQueueNums() { + return readQueueNums; + } + + public void setReadQueueNums(int readQueueNums) { + this.readQueueNums = readQueueNums; + } + + public int getWriteQueueNums() { + return writeQueueNums; + } + + public void setWriteQueueNums(int writeQueueNums) { + this.writeQueueNums = writeQueueNums; + } + + public int getPerm() { + return perm; + } + + public void setPerm(int perm) { + this.perm = perm; + } + + public int getTopicSynFlag() { + return topicSynFlag; + } + + public void setTopicSynFlag(int topicSynFlag) { + this.topicSynFlag = topicSynFlag; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode()); + result = prime * result + perm; + result = prime * result + readQueueNums; + result = prime * result + writeQueueNums; + result = prime * result + topicSynFlag; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + QueueData other = (QueueData) obj; + if (brokerName == null) { + if (other.brokerName != null) + return false; + } else if (!brokerName.equals(other.brokerName)) + return false; + if (perm != other.perm) + return false; + if (readQueueNums != other.readQueueNums) + return false; + if (writeQueueNums != other.writeQueueNums) + return false; + if (topicSynFlag != other.topicSynFlag) + return false; + return true; + } + + @Override + public String toString() { + return "QueueData [brokerName=" + brokerName + ", readQueueNums=" + readQueueNums + + ", writeQueueNums=" + writeQueueNums + ", perm=" + perm + ", topicSynFlag=" + topicSynFlag + + "]"; + } + + @Override + public int compareTo(QueueData o) { + return this.brokerName.compareTo(o.getBrokerName()); + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java new file mode 100644 index 0000000..13c5273 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: TopicRouteData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package org.apache.rocketmq.common.protocol.route; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class TopicRouteData extends RemotingSerializable { + private String orderTopicConf; + private List<QueueData> queueDatas; + private List<BrokerData> brokerDatas; + private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; + + + public TopicRouteData cloneTopicRouteData() { + TopicRouteData topicRouteData = new TopicRouteData(); + topicRouteData.setQueueDatas(new ArrayList<QueueData>()); + topicRouteData.setBrokerDatas(new ArrayList<BrokerData>()); + topicRouteData.setFilterServerTable(new HashMap<String, List<String>>()); + topicRouteData.setOrderTopicConf(this.orderTopicConf); + + if (this.queueDatas != null) { + topicRouteData.getQueueDatas().addAll(this.queueDatas); + } + + if (this.brokerDatas != null) { + topicRouteData.getBrokerDatas().addAll(this.brokerDatas); + } + + if (this.filterServerTable != null) { + topicRouteData.getFilterServerTable().putAll(this.filterServerTable); + } + + return topicRouteData; + } + + + public List<QueueData> getQueueDatas() { + return queueDatas; + } + + + public void setQueueDatas(List<QueueData> queueDatas) { + this.queueDatas = queueDatas; + } + + + public List<BrokerData> getBrokerDatas() { + return brokerDatas; + } + + + public void setBrokerDatas(List<BrokerData> brokerDatas) { + this.brokerDatas = brokerDatas; + } + + public HashMap<String, List<String>> getFilterServerTable() { + return filterServerTable; + } + + public void setFilterServerTable(HashMap<String, List<String>> filterServerTable) { + this.filterServerTable = filterServerTable; + } + + public String getOrderTopicConf() { + return orderTopicConf; + } + + public void setOrderTopicConf(String orderTopicConf) { + this.orderTopicConf = orderTopicConf; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((brokerDatas == null) ? 0 : brokerDatas.hashCode()); + result = prime * result + ((orderTopicConf == null) ? 0 : orderTopicConf.hashCode()); + result = prime * result + ((queueDatas == null) ? 0 : queueDatas.hashCode()); + result = prime * result + ((filterServerTable == null) ? 0 : filterServerTable.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TopicRouteData other = (TopicRouteData) obj; + if (brokerDatas == null) { + if (other.brokerDatas != null) + return false; + } else if (!brokerDatas.equals(other.brokerDatas)) + return false; + if (orderTopicConf == null) { + if (other.orderTopicConf != null) + return false; + } else if (!orderTopicConf.equals(other.orderTopicConf)) + return false; + if (queueDatas == null) { + if (other.queueDatas != null) + return false; + } else if (!queueDatas.equals(other.queueDatas)) + return false; + if (filterServerTable == null) { + if (other.filterServerTable != null) + return false; + } else if (!filterServerTable.equals(other.filterServerTable)) + return false; + return true; + } + + @Override + public String toString() { + return "TopicRouteData [orderTopicConf=" + orderTopicConf + ", queueDatas=" + queueDatas + + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java b/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java new file mode 100644 index 0000000..df5ec71 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.topic; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + + +public class OffsetMovedEvent extends RemotingSerializable { + private String consumerGroup; + private MessageQueue messageQueue; + private long offsetRequest; + private long offsetNew; + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + + public void setMessageQueue(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + + public long getOffsetRequest() { + return offsetRequest; + } + + + public void setOffsetRequest(long offsetRequest) { + this.offsetRequest = offsetRequest; + } + + + public long getOffsetNew() { + return offsetNew; + } + + + public void setOffsetNew(long offsetNew) { + this.offsetNew = offsetNew; + } + + + @Override + public String toString() { + return "OffsetMovedEvent [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue + + ", offsetRequest=" + offsetRequest + ", offsetNew=" + offsetNew + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java b/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java new file mode 100644 index 0000000..7036fdd --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.queue; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * thread safe + * + * @author lansheng.zj + */ +public class ConcurrentTreeMap<K, V> { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final ReentrantLock lock; + private TreeMap<K, V> tree; + private RoundQueue<K> roundQueue; + + + public ConcurrentTreeMap(int capacity, Comparator<? super K> comparator) { + tree = new TreeMap<K, V>(comparator); + roundQueue = new RoundQueue<K>(capacity); + lock = new ReentrantLock(true); + } + + + public Map.Entry<K, V> pollFirstEntry() { + lock.lock(); + try { + return tree.pollFirstEntry(); + } finally { + lock.unlock(); + } + } + + + public V putIfAbsentAndRetExsit(K key, V value) { + lock.lock(); + try { + if (roundQueue.put(key)) { + V exsit = tree.get(key); + if (null == exsit) { + tree.put(key, value); + exsit = value; + } + log.warn("putIfAbsentAndRetExsit success. {}", key); + return exsit; + } + + else { + V exsit = tree.get(key); + return exsit; + } + } finally { + lock.unlock(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java b/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java new file mode 100644 index 0000000..f32569b --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.queue; + +import java.util.LinkedList; +import java.util.Queue; + + +/** + * not thread safe + * + * @author lansheng.zj + */ +public class RoundQueue<E> { + + private Queue<E> queue; + private int capacity; + + + public RoundQueue(int capacity) { + this.capacity = capacity; + queue = new LinkedList<E>(); + } + + + public boolean put(E e) { + boolean ok = false; + if (!queue.contains(e)) { + if (queue.size() >= capacity) { + queue.poll(); + } + queue.add(e); + ok = true; + } + + return ok; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/running/RunningStats.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/running/RunningStats.java b/common/src/main/java/org/apache/rocketmq/common/running/RunningStats.java new file mode 100644 index 0000000..106d111 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/running/RunningStats.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.running; + +public enum RunningStats { + commitLogMaxOffset, + commitLogMinOffset, + commitLogDiskRatio, + consumeQueueDiskRatio, + scheduleMessageOffset, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java new file mode 100644 index 0000000..58eedb2 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.stats; + +import org.apache.rocketmq.common.UtilAll; +import org.slf4j.Logger; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + + +public class MomentStatsItem { + + private final AtomicLong value = new AtomicLong(0); + + private final String statsName; + private final String statsKey; + private final ScheduledExecutorService scheduledExecutorService; + private final Logger log; + + + public MomentStatsItem(String statsName, String statsKey, + ScheduledExecutorService scheduledExecutorService, Logger log) { + this.statsName = statsName; + this.statsKey = statsKey; + this.scheduledExecutorService = scheduledExecutorService; + this.log = log; + } + + + public void init() { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtMinutes(); + + MomentStatsItem.this.value.set(0); + } catch (Throwable e) { + } + } + }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS); + } + + + public void printAtMinutes() { + log.info(String.format("[%s] [%s] Stats Every 5 Minutes, Value: %d", + this.statsName, + this.statsKey, + this.value.get())); + } + + public AtomicLong getValue() { + return value; + } + + + public String getStatsKey() { + return statsKey; + } + + + public String getStatsName() { + return statsName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java new file mode 100644 index 0000000..aba61c7 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.stats; + +import org.apache.rocketmq.common.UtilAll; +import org.slf4j.Logger; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +public class MomentStatsItemSet { + private final ConcurrentHashMap<String/* key */, MomentStatsItem> statsItemTable = + new ConcurrentHashMap<String, MomentStatsItem>(128); + private final String statsName; + private final ScheduledExecutorService scheduledExecutorService; + private final Logger log; + + + public MomentStatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) { + this.statsName = statsName; + this.scheduledExecutorService = scheduledExecutorService; + this.log = log; + this.init(); + } + + public ConcurrentHashMap<String, MomentStatsItem> getStatsItemTable() { + return statsItemTable; + } + + public String getStatsName() { + return statsName; + } + + public void init() { + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtMinutes(); + } catch (Throwable e) { + } + } + }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS); + } + + private void printAtMinutes() { + Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, MomentStatsItem> next = it.next(); + next.getValue().printAtMinutes(); + } + } + + public void setValue(final String statsKey, final int value) { + MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey); + statsItem.getValue().set(value); + } + + public MomentStatsItem getAndCreateStatsItem(final String statsKey) { + MomentStatsItem statsItem = this.statsItemTable.get(statsKey); + if (null == statsItem) { + statsItem = + new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); + MomentStatsItem prev = this.statsItemTable.put(statsKey, statsItem); + + if (null == prev) { + + // statsItem.init(); + } + } + + return statsItem; + } +}