http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java index c9e534f..4705bed 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java @@ -6,30 +6,28 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.namesrv.kvconfig; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.namesrv.NamesrvController; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; - +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KVConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); @@ -38,19 +36,17 @@ public class KVConfigManager { private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = - new HashMap<String, HashMap<String, String>>(); - + new HashMap<String, HashMap<String, String>>(); public KVConfigManager(NamesrvController namesrvController) { this.namesrvController = namesrvController; } - public void load() { String content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath()); if (content != null) { KVConfigSerializeWrapper kvConfigSerializeWrapper = - KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class); + KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class); if (null != kvConfigSerializeWrapper) { this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable()); log.info("load KV config table OK"); @@ -58,7 +54,6 @@ public class KVConfigManager { } } - public void putKVConfig(final String namespace, final String key, final String value) { try { this.lock.writeLock().lockInterruptibly(); @@ -73,10 +68,10 @@ public class KVConfigManager { final String prev = kvTable.put(key, value); if (null != prev) { log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", // - namespace, key, value); + namespace, key, value); } else { log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", // - namespace, key, value); + namespace, key, value); } } finally { this.lock.writeLock().unlock(); @@ -102,7 +97,7 @@ public class KVConfigManager { } } catch (IOException e) { log.error("persist kvconfig Exception, " - + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e); + + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e); } finally { this.lock.readLock().unlock(); } @@ -120,7 +115,7 @@ public class KVConfigManager { if (null != kvTable) { String value = kvTable.remove(key); log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", // - namespace, key, value); + namespace, key, value); } } finally { this.lock.writeLock().unlock(); @@ -179,14 +174,14 @@ public class KVConfigManager { { log.info("configTable SIZE: {}", this.configTable.size()); Iterator<Entry<String, HashMap<String, String>>> it = - this.configTable.entrySet().iterator(); + this.configTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, HashMap<String, String>> next = it.next(); Iterator<Entry<String, String>> itSub = next.getValue().entrySet().iterator(); while (itSub.hasNext()) { Entry<String, String> nextSub = itSub.next(); log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(), - nextSub.getValue()); + nextSub.getValue()); } } }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java index 6465927..e35a37c 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java @@ -6,30 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.namesrv.kvconfig; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - import java.util.HashMap; - +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class KVConfigSerializeWrapper extends RemotingSerializable { private HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable; - public HashMap<String, HashMap<String, String>> getConfigTable() { return configTable; } - public void setConfigTable(HashMap<String, HashMap<String, String>> configTable) { this.configTable = configTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java index 9ee56a4..95410fa 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java @@ -6,16 +6,17 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.namesrv.processor; +import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.help.FAQUrl; @@ -27,17 +28,14 @@ import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class ClusterTestRequestProcessor extends DefaultRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final DefaultMQAdminExt adminExt; private final String productEnvName; - public ClusterTestRequestProcessor(NamesrvController namesrvController, String productEnvName) { super(namesrvController); this.productEnvName = productEnvName; @@ -51,18 +49,17 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor { } } - @Override public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = - (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); + (GetRouteInfoRequestHeader)request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); if (topicRouteData != null) { String orderTopicConf = - this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, - requestHeader.getTopic()); + this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, + requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } else { try { @@ -82,7 +79,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor { response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() - + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index e47f300..0135274 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -6,16 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.namesrv.processor; +import io.netty.channel.ChannelHandlerContext; +import java.io.UnsupportedEncodingException; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion.Version; import org.apache.rocketmq.common.MixAll; @@ -28,40 +32,43 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader; -import org.apache.rocketmq.common.protocol.header.namesrv.*; +import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.DeleteTopicInNamesrvRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.UnsupportedEncodingException; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicLong; - - public class DefaultRequestProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); protected final NamesrvController namesrvController; - public DefaultRequestProcessor(NamesrvController namesrvController) { this.namesrvController = namesrvController; } - @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (log.isDebugEnabled()) { log.debug("receive request, {} {} {}", - request.getCode(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - request); + request.getCode(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + request); } switch (request.getCode()) { @@ -75,8 +82,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); - } - else { + } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: @@ -121,12 +127,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final PutKVConfigRequestHeader requestHeader = - (PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class); + (PutKVConfigRequestHeader)request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class); this.namesrvController.getKvConfigManager().putKVConfig( - requestHeader.getNamespace(), - requestHeader.getKey(), - requestHeader.getValue() + requestHeader.getNamespace(), + requestHeader.getKey(), + requestHeader.getValue() ); response.setCode(ResponseCode.SUCCESS); @@ -136,13 +142,13 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand getKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class); - final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response.readCustomHeader(); + final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader)response.readCustomHeader(); final GetKVConfigRequestHeader requestHeader = - (GetKVConfigRequestHeader) request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class); + (GetKVConfigRequestHeader)request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class); String value = this.namesrvController.getKvConfigManager().getKVConfig( - requestHeader.getNamespace(), - requestHeader.getKey() + requestHeader.getNamespace(), + requestHeader.getKey() ); if (value != null) { @@ -160,11 +166,11 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final DeleteKVConfigRequestHeader requestHeader = - (DeleteKVConfigRequestHeader) request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class); + (DeleteKVConfigRequestHeader)request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class); this.namesrvController.getKvConfigManager().deleteKVConfig( - requestHeader.getNamespace(), - requestHeader.getKey() + requestHeader.getNamespace(), + requestHeader.getKey() ); response.setCode(ResponseCode.SUCCESS); @@ -173,11 +179,11 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { } public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); - final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); + final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader)response.readCustomHeader(); final RegisterBrokerRequestHeader requestHeader = - (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); + (RegisterBrokerRequestHeader)request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); @@ -189,19 +195,18 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { } RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( - requestHeader.getClusterName(), - requestHeader.getBrokerAddr(), - requestHeader.getBrokerName(), - requestHeader.getBrokerId(), - requestHeader.getHaServerAddr(), - registerBrokerBody.getTopicConfigSerializeWrapper(), - registerBrokerBody.getFilterServerList(), - ctx.channel()); + requestHeader.getClusterName(), + requestHeader.getBrokerAddr(), + requestHeader.getBrokerName(), + requestHeader.getBrokerId(), + requestHeader.getHaServerAddr(), + registerBrokerBody.getTopicConfigSerializeWrapper(), + registerBrokerBody.getFilterServerList(), + ctx.channel()); responseHeader.setHaServerAddr(result.getHaServerAddr()); responseHeader.setMasterAddr(result.getMasterAddr()); - byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); response.setBody(jsonValue); @@ -212,9 +217,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); - final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); + final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader)response.readCustomHeader(); final RegisterBrokerRequestHeader requestHeader = - (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); + (RegisterBrokerRequestHeader)request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); TopicConfigSerializeWrapper topicConfigWrapper; if (request.getBody() != null) { @@ -226,20 +231,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { } RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( - requestHeader.getClusterName(), - requestHeader.getBrokerAddr(), - requestHeader.getBrokerName(), - requestHeader.getBrokerId(), - requestHeader.getHaServerAddr(), - topicConfigWrapper, - null, - ctx.channel() + requestHeader.getClusterName(), + requestHeader.getBrokerAddr(), + requestHeader.getBrokerName(), + requestHeader.getBrokerId(), + requestHeader.getHaServerAddr(), + topicConfigWrapper, + null, + ctx.channel() ); responseHeader.setHaServerAddr(result.getHaServerAddr()); responseHeader.setMasterAddr(result.getMasterAddr()); - byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); response.setBody(jsonValue); response.setCode(ResponseCode.SUCCESS); @@ -250,13 +254,13 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final UnRegisterBrokerRequestHeader requestHeader = - (UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class); + (UnRegisterBrokerRequestHeader)request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class); this.namesrvController.getRouteInfoManager().unregisterBroker( - requestHeader.getClusterName(), - requestHeader.getBrokerAddr(), - requestHeader.getBrokerName(), - requestHeader.getBrokerId()); + requestHeader.getClusterName(), + requestHeader.getBrokerAddr(), + requestHeader.getBrokerName(), + requestHeader.getBrokerId()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); @@ -266,15 +270,15 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = - (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); + (GetRouteInfoRequestHeader)request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); if (topicRouteData != null) { if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { String orderTopicConf = - this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, - requestHeader.getTopic()); + this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, + requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } @@ -287,7 +291,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() - + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } @@ -304,16 +308,16 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(WipeWritePermOfBrokerResponseHeader.class); - final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader) response.readCustomHeader(); + final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader)response.readCustomHeader(); final WipeWritePermOfBrokerRequestHeader requestHeader = - (WipeWritePermOfBrokerRequestHeader) request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class); + (WipeWritePermOfBrokerRequestHeader)request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class); int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName()); log.info("wipe write perm of broker[{}], client: {}, {}", - requestHeader.getBrokerName(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - wipeTopicCnt); + requestHeader.getBrokerName(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + wipeTopicCnt); responseHeader.setWipeTopicCount(wipeTopicCnt); response.setCode(ResponseCode.SUCCESS); @@ -335,7 +339,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final DeleteTopicInNamesrvRequestHeader requestHeader = - (DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class); + (DeleteTopicInNamesrvRequestHeader)request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class); this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic()); @@ -347,10 +351,10 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetKVListByNamespaceRequestHeader requestHeader = - (GetKVListByNamespaceRequestHeader) request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class); + (GetKVListByNamespaceRequestHeader)request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class); byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace( - requestHeader.getNamespace()); + requestHeader.getNamespace()); if (null != jsonValue) { response.setBody(jsonValue); response.setCode(ResponseCode.SUCCESS); @@ -366,7 +370,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetTopicsByClusterRequestHeader requestHeader = - (GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class); + (GetTopicsByClusterRequestHeader)request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class); byte[] body = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster()); @@ -376,7 +380,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -388,7 +391,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -400,7 +402,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -412,9 +413,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getHasUnitSubUnUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java index 47e1dc9..f4bbf24 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java @@ -6,51 +6,45 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.namesrv.routeinfo; +import io.netty.channel.Channel; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.ChannelEventListener; -import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class BrokerHousekeepingService implements ChannelEventListener { private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final NamesrvController namesrvController; - public BrokerHousekeepingService(NamesrvController namesrvController) { this.namesrvController = namesrvController; } - @Override public void onChannelConnect(String remoteAddr, Channel channel) { } - @Override public void onChannelClose(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } - @Override public void onChannelException(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } - @Override public void onChannelIdle(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 82b4cbf..e440e61 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -16,6 +16,18 @@ */ package org.apache.rocketmq.namesrv.routeinfo; +import io.netty.channel.Channel; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -30,17 +42,9 @@ import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.remoting.common.RemotingUtil; -import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - - public class RouteInfoManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; @@ -51,7 +55,6 @@ public class RouteInfoManager { private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; - public RouteInfoManager() { this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); this.brokerAddrTable = new HashMap<String, BrokerData>(128); @@ -97,20 +100,19 @@ public class RouteInfoManager { } public RegisterBrokerResult registerBroker( - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId, - final String haServerAddr, - final TopicConfigSerializeWrapper topicConfigWrapper, - final List<String> filterServerList, - final Channel channel) { + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId, + final String haServerAddr, + final TopicConfigSerializeWrapper topicConfigWrapper, + final List<String> filterServerList, + final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { this.lock.writeLock().lockInterruptibly(); - Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); @@ -120,7 +122,6 @@ public class RouteInfoManager { boolean registerFirst = false; - BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; @@ -134,13 +135,12 @@ public class RouteInfoManager { String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); - if (null != topicConfigWrapper // - && MixAll.MASTER_ID == brokerId) { + && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())// - || registerFirst) { + || registerFirst) { ConcurrentHashMap<String, TopicConfig> tcTable = - topicConfigWrapper.getTopicConfigTable(); + topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); @@ -149,18 +149,16 @@ public class RouteInfoManager { } } - BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, - new BrokerLiveInfo( - System.currentTimeMillis(), - topicConfigWrapper.getDataVersion(), - channel, - haServerAddr)); + new BrokerLiveInfo( + System.currentTimeMillis(), + topicConfigWrapper.getDataVersion(), + channel, + haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr); } - if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); @@ -169,7 +167,6 @@ public class RouteInfoManager { } } - if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { @@ -224,7 +221,7 @@ public class RouteInfoManager { addNewOne = false; } else { log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, - queueData); + queueData); it.remove(); } } @@ -274,18 +271,18 @@ public class RouteInfoManager { } public void unregisterBroker( - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId) { + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId) { try { try { this.lock.writeLock().lockInterruptibly(); BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr); if (brokerLiveInfo != null) { log.info("unregisterBroker, remove from brokerLiveTable {}, {}", - brokerLiveInfo != null ? "OK" : "Failed", - brokerAddr + brokerLiveInfo != null ? "OK" : "Failed", + brokerAddr ); } @@ -296,14 +293,14 @@ public class RouteInfoManager { if (null != brokerData) { String addr = brokerData.getBrokerAddrs().remove(brokerId); log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}", - addr != null ? "OK" : "Failed", - brokerAddr + addr != null ? "OK" : "Failed", + brokerAddr ); if (brokerData.getBrokerAddrs().isEmpty()) { this.brokerAddrTable.remove(brokerName); log.info("unregisterBroker, remove name from brokerAddrTable OK, {}", - brokerName + brokerName ); removeBrokerName = true; @@ -315,13 +312,13 @@ public class RouteInfoManager { if (nameSet != null) { boolean removed = nameSet.remove(brokerName); log.info("unregisterBroker, remove name from clusterAddrTable {}, {}", - removed ? "OK" : "Failed", - brokerName); + removed ? "OK" : "Failed", + brokerName); if (nameSet.isEmpty()) { this.clusterAddrTable.remove(clusterName); log.info("unregisterBroker, remove cluster from clusterAddrTable {}", - clusterName + clusterName ); } } @@ -377,7 +374,6 @@ public class RouteInfoManager { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; - Iterator<QueueData> it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); @@ -389,8 +385,8 @@ public class RouteInfoManager { if (null != brokerData) { BrokerData brokerDataClone = new BrokerData(); brokerDataClone.setBrokerName(brokerData.getBrokerName()); - brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData - .getBrokerAddrs().clone()); + brokerDataClone.setBrokerAddrs((HashMap<Long, String>)brokerData + .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { @@ -439,7 +435,7 @@ public class RouteInfoManager { try { this.lock.readLock().lockInterruptibly(); Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = - this.brokerLiveTable.entrySet().iterator(); + this.brokerLiveTable.entrySet().iterator(); while (itBrokerLiveTable.hasNext()) { Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); if (entry.getValue().getChannel() == channel) { @@ -461,7 +457,6 @@ public class RouteInfoManager { log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound); } - if (brokerAddrFound != null && brokerAddrFound.length() > 0) { try { @@ -472,7 +467,7 @@ public class RouteInfoManager { String brokerNameFound = null; boolean removeBrokerName = false; Iterator<Entry<String, BrokerData>> itBrokerAddrTable = - this.brokerAddrTable.entrySet().iterator(); + this.brokerAddrTable.entrySet().iterator(); while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); @@ -485,7 +480,7 @@ public class RouteInfoManager { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", - brokerId, brokerAddr); + brokerId, brokerAddr); break; } } @@ -494,7 +489,7 @@ public class RouteInfoManager { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", - brokerData.getBrokerName()); + brokerData.getBrokerName()); } } @@ -507,12 +502,11 @@ public class RouteInfoManager { boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", - brokerNameFound, clusterName); - + brokerNameFound, clusterName); if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", - clusterName); + clusterName); it.remove(); } @@ -523,7 +517,7 @@ public class RouteInfoManager { if (removeBrokerName) { Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = - this.topicQueueTable.entrySet().iterator(); + this.topicQueueTable.entrySet().iterator(); while (itTopicQueueTable.hasNext()) { Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); String topic = entry.getKey(); @@ -535,14 +529,14 @@ public class RouteInfoManager { if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", - topic, queueData); + topic, queueData); } } if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", - topic); + topic); } } } @@ -603,7 +597,6 @@ public class RouteInfoManager { } } - public byte[] getSystemTopicList() { TopicList topicList = new TopicList(); try { @@ -644,7 +637,7 @@ public class RouteInfoManager { Set<String> brokerNameSet = this.clusterAddrTable.get(cluster); for (String brokerName : brokerNameSet) { Iterator<Entry<String, List<QueueData>>> topicTableIt = - this.topicQueueTable.entrySet().iterator(); + this.topicQueueTable.entrySet().iterator(); while (topicTableIt.hasNext()) { Entry<String, List<QueueData>> topicEntry = topicTableIt.next(); String topic = topicEntry.getKey(); @@ -673,13 +666,13 @@ public class RouteInfoManager { try { this.lock.readLock().lockInterruptibly(); Iterator<Entry<String, List<QueueData>>> topicTableIt = - this.topicQueueTable.entrySet().iterator(); + this.topicQueueTable.entrySet().iterator(); while (topicTableIt.hasNext()) { Entry<String, List<QueueData>> topicEntry = topicTableIt.next(); String topic = topicEntry.getKey(); List<QueueData> queueDatas = topicEntry.getValue(); if (queueDatas != null && queueDatas.size() > 0 - && TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())) { + && TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())) { topicList.getTopicList().add(topic); } } @@ -699,13 +692,13 @@ public class RouteInfoManager { try { this.lock.readLock().lockInterruptibly(); Iterator<Entry<String, List<QueueData>>> topicTableIt = - this.topicQueueTable.entrySet().iterator(); + this.topicQueueTable.entrySet().iterator(); while (topicTableIt.hasNext()) { Entry<String, List<QueueData>> topicEntry = topicTableIt.next(); String topic = topicEntry.getKey(); List<QueueData> queueDatas = topicEntry.getValue(); if (queueDatas != null && queueDatas.size() > 0 - && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) { + && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) { topicList.getTopicList().add(topic); } } @@ -725,14 +718,14 @@ public class RouteInfoManager { try { this.lock.readLock().lockInterruptibly(); Iterator<Entry<String, List<QueueData>>> topicTableIt = - this.topicQueueTable.entrySet().iterator(); + this.topicQueueTable.entrySet().iterator(); while (topicTableIt.hasNext()) { Entry<String, List<QueueData>> topicEntry = topicTableIt.next(); String topic = topicEntry.getKey(); List<QueueData> queueDatas = topicEntry.getValue(); if (queueDatas != null && queueDatas.size() > 0 - && !TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag()) - && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) { + && !TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag()) + && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) { topicList.getTopicList().add(topic); } } @@ -747,66 +740,55 @@ public class RouteInfoManager { } } - class BrokerLiveInfo { private long lastUpdateTimestamp; private DataVersion dataVersion; private Channel channel; private String haServerAddr; - public BrokerLiveInfo(long lastUpdateTimestamp, DataVersion dataVersion, Channel channel, - String haServerAddr) { + String haServerAddr) { this.lastUpdateTimestamp = lastUpdateTimestamp; this.dataVersion = dataVersion; this.channel = channel; this.haServerAddr = haServerAddr; } - public long getLastUpdateTimestamp() { return lastUpdateTimestamp; } - public void setLastUpdateTimestamp(long lastUpdateTimestamp) { this.lastUpdateTimestamp = lastUpdateTimestamp; } - public DataVersion getDataVersion() { return dataVersion; } - public void setDataVersion(DataVersion dataVersion) { this.dataVersion = dataVersion; } - public Channel getChannel() { return channel; } - public void setChannel(Channel channel) { this.channel = channel; } - public String getHaServerAddr() { return haServerAddr; } - public void setHaServerAddr(String haServerAddr) { this.haServerAddr = haServerAddr; } - @Override public String toString() { return "BrokerLiveInfo [lastUpdateTimestamp=" + lastUpdateTimestamp + ", dataVersion=" + dataVersion - + ", channel=" + channel + ", haServerAddr=" + haServerAddr + "]"; + + ", channel=" + channel + ", haServerAddr=" + haServerAddr + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/pom.xml ---------------------------------------------------------------------- diff --git a/remoting/pom.xml b/remoting/pom.xml index 6e75c54..15f4443 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -15,7 +15,7 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.rocketmq</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java index ba93f09..98cbb53 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java @@ -6,31 +6,27 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting; import io.netty.channel.Channel; - /** * */ public interface ChannelEventListener { void onChannelConnect(final String remoteAddr, final Channel channel); - void onChannelClose(final String remoteAddr, final Channel channel); - void onChannelException(final String remoteAddr, final Channel channel); - void onChannelIdle(final String remoteAddr, final Channel channel); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCustomHeader.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCustomHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCustomHeader.java index de7d3b0..bd1d122 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCustomHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCustomHeader.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting; import org.apache.rocketmq.remoting.exception.RemotingCommandException; - public interface CommandCustomHeader { void checkFields() throws RemotingCommandException; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java b/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java index 3db5f69..c3bcd87 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting; import org.apache.rocketmq.remoting.netty.ResponseFuture; - /** * */ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java index c489f1d..c118180 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java @@ -19,11 +19,9 @@ package org.apache.rocketmq.remoting; import org.apache.rocketmq.remoting.protocol.RemotingCommand; - public interface RPCHook { void doBeforeRequest(final String remoteAddr, final RemotingCommand request); - void doAfterResponse(final String remoteAddr, final RemotingCommand request, - final RemotingCommand response); + final RemotingCommand response); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java index 5f96a34..6c7f7a9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java @@ -6,16 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting; +import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; @@ -23,10 +25,6 @@ import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import java.util.List; -import java.util.concurrent.ExecutorService; - - /** * */ @@ -34,28 +32,22 @@ public interface RemotingClient extends RemotingService { public void updateNameServerAddressList(final List<String> addrs); - public List<String> getNameServerAddressList(); - public RemotingCommand invokeSync(final String addr, final RemotingCommand request, - final long timeoutMillis) throws InterruptedException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException; - + final long timeoutMillis) throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException; public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis, - final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, - RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; - + final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, + RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis) - throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, - RemotingTimeoutException, RemotingSendRequestException; - + throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, + RemotingTimeoutException, RemotingSendRequestException; public void registerProcessor(final int requestCode, final NettyRequestProcessor processor, - final ExecutorService executor); - + final ExecutorService executor); public boolean isChannelWriteable(final String addr); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java index 98270ec..d0b13fc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java @@ -16,16 +16,14 @@ */ package org.apache.rocketmq.remoting; +import io.netty.channel.Channel; +import java.util.concurrent.ExecutorService; import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.Channel; - -import java.util.concurrent.ExecutorService; - /** * @@ -33,30 +31,24 @@ import java.util.concurrent.ExecutorService; public interface RemotingServer extends RemotingService { void registerProcessor(final int requestCode, final NettyRequestProcessor processor, - final ExecutorService executor); - + final ExecutorService executor); void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); - int localListenPort(); - Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); - RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, - final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, - RemotingTimeoutException; - + final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, + RemotingTimeoutException; void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, - final InvokeCallback invokeCallback) throws InterruptedException, - RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; - + final InvokeCallback invokeCallback) throws InterruptedException, + RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) - throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, - RemotingSendRequestException; + throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, + RemotingSendRequestException; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java index 1af2b16..50e89d0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java @@ -20,9 +20,7 @@ package org.apache.rocketmq.remoting; public interface RemotingService { void start(); - void shutdown(); - void registerRPCHook(RPCHook rpcHook); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/annotation/CFNullable.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/annotation/CFNullable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/annotation/CFNullable.java index b552057..fabc06b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/annotation/CFNullable.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/annotation/CFNullable.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.annotation; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java index 180348c..2f2fc77 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.common; @@ -20,28 +20,23 @@ public class Pair<T1, T2> { private T1 object1; private T2 object2; - public Pair(T1 object1, T2 object2) { this.object1 = object1; this.object2 = object2; } - public T1 getObject1() { return object1; } - public void setObject1(T1 object1) { this.object1 = object1; } - public T2 getObject2() { return object2; } - public void setObject2(T2 object2) { this.object2 = object2; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java index 4300537..8d189e7 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java @@ -6,28 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.common; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; import io.netty.channel.Channel; - import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; - +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class RemotingHelper { public static final String ROCKETMQ_REMOTING = "RocketmqRemoting"; @@ -56,8 +54,8 @@ public class RemotingHelper { } public static RemotingCommand invokeSync(final String addr, final RemotingCommand request, - final long timeoutMillis) throws InterruptedException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException { + final long timeoutMillis) throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException { long beginTime = System.currentTimeMillis(); SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); SocketChannel socketChannel = RemotingUtil.connect(socketAddress); @@ -69,8 +67,7 @@ public class RemotingHelper { socketChannel.configureBlocking(true); //bugfix http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 - socketChannel.socket().setSoTimeout((int) timeoutMillis); - + socketChannel.socket().setSoTimeout((int)timeoutMillis); ByteBuffer byteBufferRequest = request.encode(); while (byteBufferRequest.hasRemaining()) { @@ -86,7 +83,6 @@ public class RemotingHelper { throw new RemotingSendRequestException(addr); } - Thread.sleep(1); } @@ -106,7 +102,6 @@ public class RemotingHelper { throw new RemotingTimeoutException(addr, timeoutMillis); } - Thread.sleep(1); } @@ -125,11 +120,9 @@ public class RemotingHelper { throw new RemotingTimeoutException(addr, timeoutMillis); } - Thread.sleep(1); } - byteBufferBody.flip(); return RemotingCommand.decode(byteBufferBody); } catch (IOException e) { @@ -152,7 +145,6 @@ public class RemotingHelper { } } - public static String parseChannelRemoteAddr(final Channel channel) { if (null == channel) { return ""; @@ -172,19 +164,17 @@ public class RemotingHelper { return ""; } - public static String parseChannelRemoteName(final Channel channel) { if (null == channel) { return ""; } - final InetSocketAddress remote = (InetSocketAddress) channel.remoteAddress(); + final InetSocketAddress remote = (InetSocketAddress)channel.remoteAddress(); if (remote != null) { return remote.getAddress().getHostName(); } return ""; } - public static String parseSocketAddressAddr(SocketAddress socketAddress) { if (socketAddress != null) { final String addr = socketAddress.toString(); @@ -196,10 +186,9 @@ public class RemotingHelper { return ""; } - public static String parseSocketAddressName(SocketAddress socketAddress) { - final InetSocketAddress addrs = (InetSocketAddress) socketAddress; + final InetSocketAddress addrs = (InetSocketAddress)socketAddress; if (addrs != null) { return addrs.getAddress().getHostName(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java index 005471e..bcc2232 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java @@ -6,22 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.common; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.lang.reflect.Method; import java.net.Inet6Address; @@ -36,7 +33,8 @@ import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.ArrayList; import java.util.Enumeration; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RemotingUtil { public static final String OS_NAME = System.getProperty("os.name"); @@ -69,7 +67,7 @@ public class RemotingUtil { try { final Method method = providerClazz.getMethod("provider"); if (method != null) { - final SelectorProvider selectorProvider = (SelectorProvider) method.invoke(null); + final SelectorProvider selectorProvider = (SelectorProvider)method.invoke(null); if (selectorProvider != null) { result = selectorProvider.openSelector(); } @@ -141,7 +139,6 @@ public class RemotingUtil { return null; } - public static String normalizeHostAddress(final InetAddress localHost) { if (localHost instanceof Inet6Address) { return "[" + localHost.getHostAddress() + "]"; @@ -156,22 +153,19 @@ public class RemotingUtil { return isa; } - public static String socketAddress2String(final SocketAddress addr) { StringBuilder sb = new StringBuilder(); - InetSocketAddress inetSocketAddress = (InetSocketAddress) addr; + InetSocketAddress inetSocketAddress = (InetSocketAddress)addr; sb.append(inetSocketAddress.getAddress().getHostAddress()); sb.append(":"); sb.append(inetSocketAddress.getPort()); return sb.toString(); } - public static SocketChannel connect(SocketAddress remote) { return connect(remote, 1000 * 5); } - public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) { SocketChannel sc = null; try { @@ -197,14 +191,13 @@ public class RemotingUtil { return null; } - public static void closeChannel(Channel channel) { final String addrRemote = RemotingHelper.parseChannelRemoteAddr(channel); channel.close().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { log.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote, - future.isSuccess()); + future.isSuccess()); } }); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java index 7734f86..c8d594e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java @@ -6,30 +6,27 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.common; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; - public class SemaphoreReleaseOnlyOnce { private final AtomicBoolean released = new AtomicBoolean(false); private final Semaphore semaphore; - public SemaphoreReleaseOnlyOnce(Semaphore semaphore) { this.semaphore = semaphore; } - public void release() { if (this.semaphore != null) { if (this.released.compareAndSet(false, true)) { @@ -38,7 +35,6 @@ public class SemaphoreReleaseOnlyOnce { } } - public Semaphore getSemaphore() { return semaphore; }