http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java b/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java index cda7952..1e13d39 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java @@ -6,28 +6,25 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.example.simple; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; - import java.util.List; import java.util.concurrent.ConcurrentHashMap; - +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; public class RandomAsyncCommit { private final ConcurrentHashMap<MessageQueue, CachedQueue> mqCachedTable = - new ConcurrentHashMap<MessageQueue, CachedQueue>(); - + new ConcurrentHashMap<MessageQueue, CachedQueue>(); public void putMessages(final MessageQueue mq, final List<MessageExt> msgs) { CachedQueue cachedQueue = this.mqCachedTable.get(mq); @@ -40,7 +37,6 @@ public class RandomAsyncCommit { } } - public void removeMessage(final MessageQueue mq, long offset) { CachedQueue cachedQueue = this.mqCachedTable.get(mq); if (null != cachedQueue) { @@ -48,7 +44,6 @@ public class RandomAsyncCommit { } } - public long commitableOffset(final MessageQueue mq) { CachedQueue cachedQueue = this.mqCachedTable.get(mq); if (null != cachedQueue) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java index 0304a63..8787fa8 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java @@ -33,14 +33,14 @@ public class TestProducer { try { { Message msg = new Message("TopicTest1", - "TagA", - "key113", - "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + "TagA", + "key113", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); QueryResult queryMessage = - producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis()); + producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis()); for (MessageExt m : queryMessage.getMessageList()) { System.out.printf("%s%n", m); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java index fea93a8..1beed71 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java @@ -16,17 +16,14 @@ */ package org.apache.rocketmq.example.transaction; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.common.message.MessageExt; -import java.util.concurrent.atomic.AtomicInteger; - - public class TransactionCheckListenerImpl implements TransactionCheckListener { private AtomicInteger transactionIndex = new AtomicInteger(0); - @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.printf("server checking TrMsg " + msg.toString() + "%n"); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java index eb787fd..b767a4a 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java @@ -6,26 +6,24 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.example.transaction; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.common.message.Message; -import java.util.concurrent.atomic.AtomicInteger; - public class TransactionExecuterImpl implements LocalTransactionExecuter { private AtomicInteger transactionIndex = new AtomicInteger(1); - @Override public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { int value = transactionIndex.getAndIncrement(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java index 5a868c6..1609a81 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.example.transaction; +import java.io.UnsupportedEncodingException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionCheckListener; @@ -23,8 +24,6 @@ import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; -import java.io.UnsupportedEncodingException; - public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); @@ -35,13 +34,13 @@ public class TransactionProducer { producer.setTransactionCheckListener(transactionCheckListener); producer.start(); - String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; + String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); for (int i = 0; i < 100; i++) { try { Message msg = - new Message("TopicTest", tags[i % tags.length], "KEY" + i, - ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + new Message("TopicTest", tags[i % tags.length], "KEY" + i, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.printf("%s%n", sendResult); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/resources/MessageFilterImpl.java ---------------------------------------------------------------------- diff --git a/example/src/main/resources/MessageFilterImpl.java b/example/src/main/resources/MessageFilterImpl.java index 3ff3f48..83ca00e 100644 --- a/example/src/main/resources/MessageFilterImpl.java +++ b/example/src/main/resources/MessageFilterImpl.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.example.filter; @@ -20,7 +20,6 @@ package org.apache.rocketmq.example.filter; import org.apache.rocketmq.common.filter.MessageFilter; import org.apache.rocketmq.common.message.MessageExt; - public class MessageFilterImpl implements MessageFilter { @Override @@ -29,7 +28,7 @@ public class MessageFilterImpl implements MessageFilter { if (property != null) { int id = Integer.parseInt(property); if (((id % 10) == 0) && // - (id > 100)) { + (id > 100)) { return true; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/pom.xml ---------------------------------------------------------------------- diff --git a/filtersrv/pom.xml b/filtersrv/pom.xml index bebd10a..cf5388d 100644 --- a/filtersrv/pom.xml +++ b/filtersrv/pom.xml @@ -15,7 +15,7 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.rocketmq</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java index bd16e0d..32b8bad 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java @@ -30,43 +30,38 @@ import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.protocol.RemotingCommand; - public class FilterServerOuterAPI { private final RemotingClient remotingClient; - public FilterServerOuterAPI() { this.remotingClient = new NettyRemotingClient(new NettyClientConfig()); } - public void start() { this.remotingClient.start(); } - public void shutdown() { this.remotingClient.shutdown(); } - public RegisterFilterServerResponseHeader registerFilterServerToBroker( - final String brokerAddr, - final String filterServerAddr + final String brokerAddr, + final String filterServerAddr ) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, InterruptedException, MQBrokerException { + RemotingTimeoutException, InterruptedException, MQBrokerException { RegisterFilterServerRequestHeader requestHeader = new RegisterFilterServerRequestHeader(); requestHeader.setFilterServerAddr(filterServerAddr); RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader); + RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterFilterServerResponseHeader responseHeader = - (RegisterFilterServerResponseHeader) response - .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class); + (RegisterFilterServerResponseHeader)response + .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class); return responseHeader; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java index ec0381d..ee2ebee 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.filtersrv; @@ -21,14 +21,13 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.remoting.common.RemotingUtil; - public class FiltersrvConfig { private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, - System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + System.getenv(MixAll.ROCKETMQ_HOME_ENV)); @ImportantField private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, - System.getenv(MixAll.NAMESRV_ADDR_ENV)); + System.getenv(MixAll.NAMESRV_ADDR_ENV)); private String connectWhichBroker = "127.0.0.1:10911"; private String filterServerIP = RemotingUtil.getLocalAddress(); @@ -36,122 +35,98 @@ public class FiltersrvConfig { private int compressMsgBodyOverHowmuch = 1024 * 8; private int zipCompressLevel = 5; - private boolean clientUploadFilterClassEnable = true; - private String filterClassRepertoryUrl = "http://fsrep.tbsite.net/filterclass"; private int fsServerAsyncSemaphoreValue = 2048; private int fsServerCallbackExecutorThreads = 64; private int fsServerWorkerThreads = 64; - public String getRocketmqHome() { return rocketmqHome; } - public void setRocketmqHome(String rocketmqHome) { this.rocketmqHome = rocketmqHome; } - public String getNamesrvAddr() { return namesrvAddr; } - public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } - public String getConnectWhichBroker() { return connectWhichBroker; } - public void setConnectWhichBroker(String connectWhichBroker) { this.connectWhichBroker = connectWhichBroker; } - public String getFilterServerIP() { return filterServerIP; } - public void setFilterServerIP(String filterServerIP) { this.filterServerIP = filterServerIP; } - public int getCompressMsgBodyOverHowmuch() { return compressMsgBodyOverHowmuch; } - public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) { this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; } - public int getZipCompressLevel() { return zipCompressLevel; } - public void setZipCompressLevel(int zipCompressLevel) { this.zipCompressLevel = zipCompressLevel; } - public boolean isClientUploadFilterClassEnable() { return clientUploadFilterClassEnable; } - public void setClientUploadFilterClassEnable(boolean clientUploadFilterClassEnable) { this.clientUploadFilterClassEnable = clientUploadFilterClassEnable; } - public String getFilterClassRepertoryUrl() { return filterClassRepertoryUrl; } - public void setFilterClassRepertoryUrl(String filterClassRepertoryUrl) { this.filterClassRepertoryUrl = filterClassRepertoryUrl; } - public int getFsServerAsyncSemaphoreValue() { return fsServerAsyncSemaphoreValue; } - public void setFsServerAsyncSemaphoreValue(int fsServerAsyncSemaphoreValue) { this.fsServerAsyncSemaphoreValue = fsServerAsyncSemaphoreValue; } - public int getFsServerCallbackExecutorThreads() { return fsServerCallbackExecutorThreads; } - public void setFsServerCallbackExecutorThreads(int fsServerCallbackExecutorThreads) { this.fsServerCallbackExecutorThreads = fsServerCallbackExecutorThreads; } - public int getFsServerWorkerThreads() { return fsServerWorkerThreads; } - public void setFsServerWorkerThreads(int fsServerWorkerThreads) { this.fsServerWorkerThreads = fsServerWorkerThreads; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java index ca136e0..c46b613 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java @@ -16,6 +16,10 @@ */ package org.apache.rocketmq.filtersrv; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -31,12 +35,6 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - public class FiltersrvController { private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); @@ -47,10 +45,10 @@ public class FiltersrvController { private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI(); private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer( - MixAll.FILTERSRV_CONSUMER_GROUP); + MixAll.FILTERSRV_CONSUMER_GROUP); private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread")); + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread")); private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager(); private RemotingServer remotingServer; @@ -58,29 +56,24 @@ public class FiltersrvController { private ExecutorService remotingExecutor; private volatile String brokerName = null; - public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) { this.filtersrvConfig = filtersrvConfig; this.nettyServerConfig = nettyServerConfig; this.filterClassManager = new FilterClassManager(this); } - public boolean initialize() { MixAll.printObjectProperties(log, this.filtersrvConfig); - this.remotingServer = new NettyRemotingServer(this.nettyServerConfig); - this.remotingExecutor = - Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), - new ThreadFactoryImpl("RemotingExecutorThread_")); + Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), + new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override @@ -90,9 +83,9 @@ public class FiltersrvController { }, 3, 10, TimeUnit.SECONDS); this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer - .getBrokerSuspendMaxTimeMillis() - 1000); + .getBrokerSuspendMaxTimeMillis() - 1000); this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer - .getConsumerTimeoutMillisWhenSuspend() - 1000); + .getConsumerTimeoutMillisWhenSuspend() - 1000); this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr()); this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid())); @@ -102,26 +95,26 @@ public class FiltersrvController { private void registerProcessor() { this.remotingServer - .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); + .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } public void registerFilterServerToBroker() { try { RegisterFilterServerResponseHeader responseHeader = - this.filterServerOuterAPI.registerFilterServerToBroker( - this.filtersrvConfig.getConnectWhichBroker(), this.localAddr()); + this.filterServerOuterAPI.registerFilterServerToBroker( + this.filtersrvConfig.getConnectWhichBroker(), this.localAddr()); this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() - .setDefaultBrokerId(responseHeader.getBrokerId()); + .setDefaultBrokerId(responseHeader.getBrokerId()); if (null == this.brokerName) { this.brokerName = responseHeader.getBrokerName(); } log.info("register filter server<{}> to broker<{}> OK, Return: {} {}", - this.localAddr(), - this.filtersrvConfig.getConnectWhichBroker(), - responseHeader.getBrokerName(), - responseHeader.getBrokerId()); + this.localAddr(), + this.filtersrvConfig.getConnectWhichBroker(), + responseHeader.getBrokerName(), + responseHeader.getBrokerId()); } catch (Exception e) { log.warn("register filter server Exception", e); @@ -132,7 +125,7 @@ public class FiltersrvController { public String localAddr() { return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(), - this.remotingServer.localListenPort()); + this.remotingServer.localListenPort()); } public void start() throws Exception { @@ -140,12 +133,11 @@ public class FiltersrvController { this.remotingServer.start(); this.filterServerOuterAPI.start(); this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() - .setConnectBrokerByUser(true); + .setConnectBrokerByUser(true); this.filterClassManager.start(); this.filterServerStatsManager.start(); } - public void shutdown() { this.remotingServer.shutdown(); this.remotingExecutor.shutdown(); @@ -156,67 +148,54 @@ public class FiltersrvController { this.filterServerStatsManager.shutdown(); } - public RemotingServer getRemotingServer() { return remotingServer; } - public void setRemotingServer(RemotingServer remotingServer) { this.remotingServer = remotingServer; } - public ExecutorService getRemotingExecutor() { return remotingExecutor; } - public void setRemotingExecutor(ExecutorService remotingExecutor) { this.remotingExecutor = remotingExecutor; } - public FiltersrvConfig getFiltersrvConfig() { return filtersrvConfig; } - public NettyServerConfig getNettyServerConfig() { return nettyServerConfig; } - public ScheduledExecutorService getScheduledExecutorService() { return scheduledExecutorService; } - public FilterServerOuterAPI getFilterServerOuterAPI() { return filterServerOuterAPI; } - public FilterClassManager getFilterClassManager() { return filterClassManager; } - public DefaultMQPullConsumer getDefaultMQPullConsumer() { return defaultMQPullConsumer; } - public String getBrokerName() { return brokerName; } - public void setBrokerName(String brokerName) { this.brokerName = brokerName; } - public FilterServerStatsManager getFilterServerStatsManager() { return filterServerStatsManager; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java index 461c79c..f239caf 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java @@ -18,6 +18,15 @@ package org.apache.rocketmq.filtersrv; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -25,20 +34,9 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.NettySystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.srvutil.ServerUtil; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - - public class FiltersrvStartup { public static Logger log; @@ -65,12 +63,10 @@ public class FiltersrvStartup { public static FiltersrvController createController(String[] args) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.socketSndbufSize = 65535; } - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize = 1024; } @@ -78,8 +74,8 @@ public class FiltersrvStartup { try { Options options = ServerUtil.buildCommandlineOptions(new Options()); final CommandLine commandLine = - ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options), - new PosixParser()); + ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options), + new PosixParser()); if (null == commandLine) { System.exit(-1); return null; @@ -108,7 +104,7 @@ public class FiltersrvStartup { nettyServerConfig.setListenPort(0); nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue()); nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig - .getFsServerCallbackExecutorThreads()); + .getFsServerCallbackExecutorThreads()); nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads()); if (commandLine.hasOption('p')) { @@ -120,11 +116,11 @@ public class FiltersrvStartup { MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig); if (null == filtersrvConfig.getRocketmqHome()) { System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV - + " variable in your environment to match the location of the RocketMQ installation%n"); + + " variable in your environment to match the location of the RocketMQ installation%n"); System.exit(-2); } - LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); @@ -132,7 +128,7 @@ public class FiltersrvStartup { log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); final FiltersrvController controller = - new FiltersrvController(filtersrvConfig, nettyServerConfig); + new FiltersrvController(filtersrvConfig, nettyServerConfig); boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java index fd95685..11102d0 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java @@ -6,35 +6,43 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.filtersrv.filter; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.UnsupportedEncodingException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.tools.JavaCompiler; +import javax.tools.ToolProvider; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.tools.JavaCompiler; -import javax.tools.ToolProvider; -import java.io.*; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.net.URLDecoder; -import java.util.*; - - public class DynaCode { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); @@ -43,46 +51,35 @@ public class DynaCode { private static final String LINE_SP = System.getProperty("line.separator"); private String sourcePath = System.getProperty("user.home") + FILE_SP + "rocketmq_filter_class" + FILE_SP - + UtilAll.getPid(); + + UtilAll.getPid(); private String outPutClassPath = sourcePath; - private ClassLoader parentClassLoader; - private List<String> codeStrs; - private Map<String/* fullClassName */, Class<?>/* class */> loadClass; - private String classpath; - private String bootclasspath; - private String extdirs; - private String encoding = "UTF-8"; - private String target; - @SuppressWarnings("unchecked") public DynaCode(String code) { this(Thread.currentThread().getContextClassLoader(), Arrays.asList(code)); } - public DynaCode(ClassLoader parentClassLoader, List<String> codeStrs) { this(extractClasspath(parentClassLoader), parentClassLoader, codeStrs); } - public DynaCode(String classpath, ClassLoader parentClassLoader, List<String> codeStrs) { this.classpath = classpath; this.parentClassLoader = parentClassLoader; @@ -90,12 +87,15 @@ public class DynaCode { this.loadClass = new HashMap<String, Class<?>>(codeStrs.size()); } + public DynaCode(List<String> codeStrs) { + this(Thread.currentThread().getContextClassLoader(), codeStrs); + } private static String extractClasspath(ClassLoader cl) { StringBuffer buf = new StringBuffer(); while (cl != null) { if (cl instanceof URLClassLoader) { - URL urls[] = ((URLClassLoader) cl).getURLs(); + URL urls[] = ((URLClassLoader)cl).getURLs(); for (int i = 0; i < urls.length; i++) { if (buf.length() > 0) { buf.append(File.pathSeparatorChar); @@ -115,13 +115,8 @@ public class DynaCode { return buf.toString(); } - - public DynaCode(List<String> codeStrs) { - this(Thread.currentThread().getContextClassLoader(), codeStrs); - } - public static Class<?> compileAndLoadClass(final String className, final String javaSource) - throws Exception { + throws Exception { String classSimpleName = FilterAPI.simpleClassName(className); String javaCode = javaSource; @@ -138,16 +133,6 @@ public class DynaCode { return clazz; } - public void compileAndLoadClass() throws Exception { - String[] sourceFiles = this.uploadSrcFile(); - this.compile(sourceFiles); - this.loadClass(this.loadClass.keySet()); - } - - public Map<String, Class<?>> getLoadClass() { - return loadClass; - } - public static String getQualifiedName(String code) { StringBuilder sb = new StringBuilder(); String className = getClassName(code); @@ -162,6 +147,57 @@ public class DynaCode { return sb.toString(); } + public static String getClassName(String code) { + String className = StringUtils.substringBefore(code, "{"); + if (StringUtils.isBlank(className)) { + return className; + } + if (StringUtils.contains(code, " class ")) { + className = StringUtils.substringAfter(className, " class "); + if (StringUtils.contains(className, " extends ")) { + className = StringUtils.substringBefore(className, " extends ").trim(); + } else if (StringUtils.contains(className, " implements ")) { + className = StringUtils.trim(StringUtils.substringBefore(className, " implements ")); + } else { + className = StringUtils.trim(className); + } + } else if (StringUtils.contains(code, " interface ")) { + className = StringUtils.substringAfter(className, " interface "); + if (StringUtils.contains(className, " extends ")) { + className = StringUtils.substringBefore(className, " extends ").trim(); + } else { + className = StringUtils.trim(className); + } + } else if (StringUtils.contains(code, " enum ")) { + className = StringUtils.trim(StringUtils.substringAfter(className, " enum ")); + } else { + return StringUtils.EMPTY; + } + return className; + } + + public static String getPackageName(String code) { + String packageName = + StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim(); + return packageName; + } + + public static String getFullClassName(String code) { + String packageName = getPackageName(code); + String className = getClassName(code); + return StringUtils.isBlank(packageName) ? className : packageName + "." + className; + } + + public void compileAndLoadClass() throws Exception { + String[] sourceFiles = this.uploadSrcFile(); + this.compile(sourceFiles); + this.loadClass(this.loadClass.keySet()); + } + + public Map<String, Class<?>> getLoadClass() { + return loadClass; + } + private String[] uploadSrcFile() throws Exception { List<String> srcFileAbsolutePaths = new ArrayList<String>(codeStrs.size()); for (String code : codeStrs) { @@ -201,7 +237,7 @@ public class DynaCode { srcFile.deleteOnExit(); } OutputStreamWriter outputStreamWriter = - new OutputStreamWriter(new FileOutputStream(srcFile), encoding); + new OutputStreamWriter(new FileOutputStream(srcFile), encoding); bufferWriter = new BufferedWriter(outputStreamWriter); for (String lineCode : code.split(LINE_SP)) { bufferWriter.write(lineCode); @@ -225,7 +261,7 @@ public class DynaCode { JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); if (compiler == null) { throw new NullPointerException( - "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!"); + "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!"); } int resultCode = compiler.run(null, null, err, args); if (resultCode != 0) { @@ -236,8 +272,8 @@ public class DynaCode { private void loadClass(Set<String> classFullNames) throws ClassNotFoundException, MalformedURLException { synchronized (loadClass) { ClassLoader classLoader = - new URLClassLoader(new URL[]{new File(outPutClassPath).toURI().toURL()}, - parentClassLoader); + new URLClassLoader(new URL[] {new File(outPutClassPath).toURI().toURL()}, + parentClassLoader); for (String key : classFullNames) { Class<?> classz = classLoader.loadClass(key); if (null != classz) { @@ -250,47 +286,6 @@ public class DynaCode { } } - public static String getClassName(String code) { - String className = StringUtils.substringBefore(code, "{"); - if (StringUtils.isBlank(className)) { - return className; - } - if (StringUtils.contains(code, " class ")) { - className = StringUtils.substringAfter(className, " class "); - if (StringUtils.contains(className, " extends ")) { - className = StringUtils.substringBefore(className, " extends ").trim(); - } else if (StringUtils.contains(className, " implements ")) { - className = StringUtils.trim(StringUtils.substringBefore(className, " implements ")); - } else { - className = StringUtils.trim(className); - } - } else if (StringUtils.contains(code, " interface ")) { - className = StringUtils.substringAfter(className, " interface "); - if (StringUtils.contains(className, " extends ")) { - className = StringUtils.substringBefore(className, " extends ").trim(); - } else { - className = StringUtils.trim(className); - } - } else if (StringUtils.contains(code, " enum ")) { - className = StringUtils.trim(StringUtils.substringAfter(className, " enum ")); - } else { - return StringUtils.EMPTY; - } - return className; - } - - public static String getPackageName(String code) { - String packageName = - StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim(); - return packageName; - } - - public static String getFullClassName(String code) { - String packageName = getPackageName(code); - String className = getClassName(code); - return StringUtils.isBlank(packageName) ? className : packageName + "." + className; - } - private String[] buildCompileJavacArgs(String srcFiles[]) { ArrayList<String> args = new ArrayList<String>(); if (StringUtils.isNotBlank(classpath)) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java index 36d6b7e..89f1883 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.filtersrv.filter; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java index d278fe3..1cb0e96 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java @@ -6,51 +6,44 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.filtersrv.filter; import org.apache.rocketmq.common.filter.MessageFilter; - public class FilterClassInfo { private String className; private int classCRC; private MessageFilter messageFilter; - public int getClassCRC() { return classCRC; } - public void setClassCRC(int classCRC) { this.classCRC = classCRC; } - public MessageFilter getMessageFilter() { return messageFilter; } - public void setMessageFilter(MessageFilter messageFilter) { this.messageFilter = messageFilter; } - public String getClassName() { return className; } - public void setClassName(String className) { this.className = className; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java index 3269852..32f5ac2 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.filtersrv.filter; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java index fab4d7d..66389e0 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java @@ -6,17 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.filtersrv.filter; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; @@ -26,14 +32,6 @@ import org.apache.rocketmq.filtersrv.FiltersrvController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - public class FilterClassManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); @@ -41,19 +39,21 @@ public class FilterClassManager { private final FiltersrvController filtersrvController; private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread")); + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread")); private ConcurrentHashMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable = - new ConcurrentHashMap<String, FilterClassInfo>(128); + new ConcurrentHashMap<String, FilterClassInfo>(128); private FilterClassFetchMethod filterClassFetchMethod; - public FilterClassManager(FiltersrvController filtersrvController) { this.filtersrvController = filtersrvController; this.filterClassFetchMethod = - new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig() - .getFilterClassRepertoryUrl()); + new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig() + .getFilterClassRepertoryUrl()); } + private static String buildKey(final String consumerGroup, final String topic) { + return topic + "@" + consumerGroup; + } public void start() { if (!this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { @@ -75,20 +75,20 @@ public class FilterClassManager { FilterClassInfo filterClassInfo = next.getValue(); String[] topicAndGroup = next.getKey().split("@"); String responseStr = - this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1], - filterClassInfo.getClassName()); + this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1], + filterClassInfo.getClassName()); byte[] filterSourceBinary = responseStr.getBytes("UTF-8"); int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8")); if (classCRC != filterClassInfo.getClassCRC()) { String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); Class<?> newClass = - DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource); + DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource); Object newInstance = newClass.newInstance(); - filterClassInfo.setMessageFilter((MessageFilter) newInstance); + filterClassInfo.setMessageFilter((MessageFilter)newInstance); filterClassInfo.setClassCRC(classCRC); log.info("fetch Remote class File OK, {} {}", next.getKey(), - filterClassInfo.getClassName()); + filterClassInfo.getClassName()); } } catch (Exception e) { log.error("fetchClassFromRemoteHost Exception", e); @@ -101,10 +101,9 @@ public class FilterClassManager { } public boolean registerFilterClass(final String consumerGroup, final String topic, - final String className, final int classCRC, final byte[] filterSourceBinary) { + final String className, final int classCRC, final byte[] filterSourceBinary) { final String key = buildKey(consumerGroup, topic); - boolean registerNew = false; FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key); if (null == filterClassInfoPrev) { @@ -135,17 +134,17 @@ public class FilterClassManager { String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource); Object newInstance = newClass.newInstance(); - filterClassInfoNew.setMessageFilter((MessageFilter) newInstance); + filterClassInfoNew.setMessageFilter((MessageFilter)newInstance); filterClassInfoNew.setClassCRC(classCRC); } this.filterClassTable.put(key, filterClassInfoNew); } catch (Throwable e) { String info = - String - .format( - "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s", - consumerGroup, topic, className); + String + .format( + "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s", + consumerGroup, topic, className); log.error(info, e); return false; } @@ -155,20 +154,14 @@ public class FilterClassManager { return true; } - private static String buildKey(final String consumerGroup, final String topic) { - return topic + "@" + consumerGroup; - } - public FilterClassInfo findFilterClass(final String consumerGroup, final String topic) { return this.filterClassTable.get(buildKey(consumerGroup, topic)); } - public FilterClassFetchMethod getFilterClassFetchMethod() { return filterClassFetchMethod; } - public void setFilterClassFetchMethod(FilterClassFetchMethod filterClassFetchMethod) { this.filterClassFetchMethod = filterClassFetchMethod; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java index c8b1515..99bfad0 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.filtersrv.filter; @@ -23,17 +23,14 @@ import org.apache.rocketmq.common.utils.HttpTinyClient.HttpResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class HttpFilterClassFetchMethod implements FilterClassFetchMethod { private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); private final String url; - public HttpFilterClassFetchMethod(String url) { this.url = url; } - @Override public String fetch(String topic, String consumerGroup, String className) { String thisUrl = String.format("%s/%s.java", this.url, className); @@ -45,7 +42,7 @@ public class HttpFilterClassFetchMethod implements FilterClassFetchMethod { } } catch (Exception e) { log.error( - String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e); + String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e); } return null; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java index f2c98ae..1d56ac1 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java @@ -16,6 +16,13 @@ */ package org.apache.rocketmq.filtersrv.processor; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; @@ -39,36 +46,25 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.CommitLog; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - - public class DefaultRequestProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); private final FiltersrvController filtersrvController; - public DefaultRequestProcessor(FiltersrvController filtersrvController) { this.filtersrvController = filtersrvController; } - @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { if (log.isDebugEnabled()) { log.debug("receive request, {} {} {}", - request.getCode(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - request); + request.getCode(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + request); } switch (request.getCode()) { @@ -89,14 +85,14 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RegisterMessageFilterClassRequestHeader requestHeader = - (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class); + (RegisterMessageFilterClassRequestHeader)request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class); try { boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(), - requestHeader.getTopic(), - requestHeader.getClassName(), - requestHeader.getClassCRC(), - request.getBody()); + requestHeader.getTopic(), + requestHeader.getClassName(), + requestHeader.getClassCRC(), + request.getBody()); if (!ok) { throw new Exception("registerFilterClass error"); } @@ -113,20 +109,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception { final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); - final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); + final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader)response.readCustomHeader(); final PullMessageRequestHeader requestHeader = - (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); + (PullMessageRequestHeader)request.decodeCommandCustomHeader(PullMessageRequestHeader.class); final FilterContext filterContext = new FilterContext(); filterContext.setConsumerGroup(requestHeader.getConsumerGroup()); - response.setOpaque(request.getOpaque()); DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer(); final FilterClassInfo findFilterClass = - this.filtersrvController.getFilterClassManager() - .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic()); + this.filtersrvController.getFilterClassManager() + .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic()); if (null == findFilterClass) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("Find Filter class failed, not registered"); @@ -141,7 +136,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); - MessageQueue mq = new MessageQueue(); mq.setTopic(requestHeader.getTopic()); mq.setQueueId(requestHeader.getQueueId()); @@ -171,7 +165,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { } } - if (!msgListOK.isEmpty()) { returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK); return; @@ -180,8 +173,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { } } catch (Throwable e) { final String error = - String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ", - requestHeader.getConsumerGroup(), requestHeader.getTopic()); + String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ", + requestHeader.getConsumerGroup(), requestHeader.getTopic()); log.error(error, e); response.setCode(ResponseCode.SYSTEM_ERROR); @@ -207,7 +200,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); } - @Override public void onException(Throwable e) { response.setCode(ResponseCode.SYSTEM_ERROR); @@ -223,7 +215,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { } private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response, - final List<MessageExt> msgList) { + final List<MessageExt> msgList) { if (null != msgList) { ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()]; int bodyTotalSize = 0; @@ -244,7 +236,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { response.setBody(body.array()); - this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size()); this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize); @@ -285,23 +276,23 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { byte[] propertiesData = properties.getBytes(MixAll.DEFAULT_CHARSET); final int propertiesLength = propertiesData.length; final int msgLen = 4 // 1 TOTALSIZE - + 4 // 2 MAGICCODE - + 4 // 3 BODYCRC - + 4 // 4 QUEUEID - + 4 // 5 FLAG - + 8 // 6 QUEUEOFFSET - + 8 // 7 PHYSICALOFFSET - + 4 // 8 SYSFLAG - + 8 // 9 BORNTIMESTAMP - + 8 // 10 BORNHOST - + 8 // 11 STORETIMESTAMP - + 8 // 12 STOREHOSTADDRESS - + 4 // 13 RECONSUMETIMES - + 8 // 14 Prepared Transaction Offset - + 4 + bodyLength // 14 BODY - + 1 + topicLength // 15 TOPIC - + 2 + propertiesLength // 16 propertiesLength - + 0; + + 4 // 2 MAGICCODE + + 4 // 3 BODYCRC + + 4 // 4 QUEUEID + + 4 // 5 FLAG + + 8 // 6 QUEUEOFFSET + + 8 // 7 PHYSICALOFFSET + + 4 // 8 SYSFLAG + + 8 // 9 BORNTIMESTAMP + + 8 // 10 BORNHOST + + 8 // 11 STORETIMESTAMP + + 8 // 12 STOREHOSTADDRESS + + 4 // 13 RECONSUMETIMES + + 8 // 14 Prepared Transaction Offset + + 4 + bodyLength // 14 BODY + + 1 + topicLength // 15 TOPIC + + 2 + propertiesLength // 16 propertiesLength + + 0; ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen); @@ -340,10 +331,10 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { if (bodyLength > 0) msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC - msgStoreItemMemory.put((byte) topicLength); + msgStoreItemMemory.put((byte)topicLength); msgStoreItemMemory.put(topicData); // 17 PROPERTIES - msgStoreItemMemory.putShort((short) propertiesLength); + msgStoreItemMemory.putShort((short)propertiesLength); if (propertiesLength > 0) msgStoreItemMemory.put(propertiesData); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java index 8665fbd..4f44e99 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java @@ -6,59 +6,52 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.filtersrv.stats; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.stats.StatsItemSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - - public class FilterServerStatsManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread")); + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread")); // ConsumerGroup Get Nums private final StatsItemSet groupGetNums = new StatsItemSet("GROUP_GET_NUMS", - this.scheduledExecutorService, log); + this.scheduledExecutorService, log); // ConsumerGroup Get Size private final StatsItemSet groupGetSize = new StatsItemSet("GROUP_GET_SIZE", - this.scheduledExecutorService, log); - + this.scheduledExecutorService, log); public FilterServerStatsManager() { } - public void start() { } - public void shutdown() { this.scheduledExecutorService.shutdown(); } - public void incGroupGetNums(final String group, final String topic, final int incValue) { this.groupGetNums.addValue(topic + "@" + group, incValue, 1); } - public void incGroupGetSize(final String group, final String topic, final int incValue) { this.groupGetSize.addValue(topic + "@" + group, incValue, 1); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/pom.xml ---------------------------------------------------------------------- diff --git a/namesrv/pom.xml b/namesrv/pom.xml index 2ec2f5f..27a1c84 100644 --- a/namesrv/pom.xml +++ b/namesrv/pom.xml @@ -15,7 +15,7 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.rocketmq</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java index 4c286e0..b212adb 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java @@ -6,16 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.namesrv; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.Configuration; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; @@ -31,12 +35,6 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - public class NamesrvController { private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); @@ -45,7 +43,7 @@ public class NamesrvController { private final NettyServerConfig nettyServerConfig; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "NSScheduledThread")); + "NSScheduledThread")); private final KVConfigManager kvConfigManager; private final RouteInfoManager routeInfoManager; @@ -57,7 +55,6 @@ public class NamesrvController { private Configuration configuration; - public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this.namesrvConfig = namesrvConfig; this.nettyServerConfig = nettyServerConfig; @@ -65,26 +62,23 @@ public class NamesrvController { this.routeInfoManager = new RouteInfoManager(); this.brokerHousekeepingService = new BrokerHousekeepingService(this); this.configuration = new Configuration( - log, - this.namesrvConfig, this.nettyServerConfig + log, + this.namesrvConfig, this.nettyServerConfig ); this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); } - public boolean initialize() { this.kvConfigManager.load(); this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); - this.remotingExecutor = - Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); + Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override @@ -104,56 +98,47 @@ public class NamesrvController { return true; } - private void registerProcessor() { if (namesrvConfig.isClusterTest()) { this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), - this.remotingExecutor); + this.remotingExecutor); } else { this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } } - public void start() throws Exception { this.remotingServer.start(); } - public void shutdown() { this.remotingServer.shutdown(); this.remotingExecutor.shutdown(); this.scheduledExecutorService.shutdown(); } - public NamesrvConfig getNamesrvConfig() { return namesrvConfig; } - public NettyServerConfig getNettyServerConfig() { return nettyServerConfig; } - public KVConfigManager getKvConfigManager() { return kvConfigManager; } - public RouteInfoManager getRouteInfoManager() { return routeInfoManager; } - public RemotingServer getRemotingServer() { return remotingServer; } - public void setRemotingServer(RemotingServer remotingServer) { this.remotingServer = remotingServer; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java index be824cd..0eb9a52 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java @@ -18,6 +18,15 @@ package org.apache.rocketmq.namesrv; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -26,20 +35,9 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.NettySystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.srvutil.ServerUtil; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - - public class NamesrvStartup { public static Properties properties = null; public static CommandLine commandLine = null; @@ -51,12 +49,10 @@ public class NamesrvStartup { public static NamesrvController main0(String[] args) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.socketSndbufSize = 4096; } - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize = 4096; } @@ -66,14 +62,13 @@ public class NamesrvStartup { Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = - ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), - new PosixParser()); + ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), + new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } - final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876); @@ -93,7 +88,6 @@ public class NamesrvStartup { } } - if (commandLine.hasOption('p')) { MixAll.printObjectProperties(null, namesrvConfig); MixAll.printObjectProperties(null, nettyServerConfig); @@ -104,22 +98,20 @@ public class NamesrvStartup { if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV - + " variable in your environment to match the location of the RocketMQ installation%n"); + + " variable in your environment to match the location of the RocketMQ installation%n"); System.exit(-2); } - LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); - MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); - final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discard @@ -135,7 +127,6 @@ public class NamesrvStartup { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); - @Override public void run() { synchronized (this) { @@ -151,7 +142,6 @@ public class NamesrvStartup { } }, "ShutdownHook")); - controller.start(); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();