ROCKETMQ-18 Reformat all codes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/388ba7a5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/388ba7a5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/388ba7a5 Branch: refs/heads/master Commit: 388ba7a58465245389a3592904b6fc7ef777dc7a Parents: 95cfb8d Author: yukon <yu...@apache.org> Authored: Wed Dec 28 15:42:48 2016 +0800 Committer: yukon <yu...@apache.org> Committed: Wed Dec 28 15:42:48 2016 +0800 ---------------------------------------------------------------------- broker/pom.xml | 2 +- .../rocketmq/broker/BrokerController.java | 155 +++-- .../rocketmq/broker/BrokerPathConfigHelper.java | 20 +- .../apache/rocketmq/broker/BrokerStartup.java | 41 +- .../broker/client/ClientChannelInfo.java | 32 +- .../client/ClientHousekeepingService.java | 29 +- .../broker/client/ConsumerGroupInfo.java | 73 +- .../client/ConsumerIdsChangeListener.java | 14 +- .../rocketmq/broker/client/ConsumerManager.java | 38 +- .../DefaultConsumerIdsChangeListener.java | 18 +- .../rocketmq/broker/client/ProducerManager.java | 54 +- .../broker/client/net/Broker2Client.java | 96 ++- .../client/rebalance/RebalanceLockManager.java | 107 ++- .../broker/filtersrv/FilterServerManager.java | 36 +- .../broker/filtersrv/FilterServerUtil.java | 13 +- .../broker/latency/BrokerFastFailure.java | 44 +- .../latency/BrokerFixedThreadPoolExecutor.java | 19 +- .../broker/longpolling/ManyPullRequest.java | 18 +- .../NotifyMessageArrivingListener.java | 15 +- .../broker/longpolling/PullRequest.java | 23 +- .../longpolling/PullRequestHoldService.java | 16 +- .../broker/mqtrace/ConsumeMessageContext.java | 36 +- .../broker/mqtrace/ConsumeMessageHook.java | 14 +- .../broker/mqtrace/SendMessageContext.java | 43 +- .../broker/mqtrace/SendMessageHook.java | 14 +- .../broker/offset/ConsumerOffsetManager.java | 39 +- .../rocketmq/broker/out/BrokerOuterAPI.java | 90 +-- .../broker/pagecache/ManyMessageTransfer.java | 18 +- .../broker/pagecache/OneMessageTransfer.java | 18 +- .../broker/pagecache/QueryMessageTransfer.java | 18 +- .../plugin/AbstractPluginMessageStore.java | 28 +- .../broker/plugin/MessageStoreFactory.java | 21 +- .../plugin/MessageStorePluginContext.java | 17 +- .../processor/AbstractSendMessageProcessor.java | 98 ++- .../broker/processor/AdminBrokerProcessor.java | 237 ++++--- .../broker/processor/ClientManageProcessor.java | 61 +- .../processor/ConsumerManageProcessor.java | 68 +- .../processor/EndTransactionProcessor.java | 50 +- .../processor/ForwardRequestProcessor.java | 17 +- .../broker/processor/PullMessageProcessor.java | 100 ++- .../broker/processor/QueryMessageProcessor.java | 61 +- .../broker/processor/SendMessageProcessor.java | 80 +-- .../rocketmq/broker/slave/SlaveSynchronize.java | 54 +- .../subscription/SubscriptionGroupManager.java | 29 +- .../broker/topic/TopicConfigManager.java | 64 +- .../broker/transaction/TransactionRecord.java | 16 +- .../broker/transaction/TransactionStore.java | 20 +- .../transaction/jdbc/JDBCTransactionStore.java | 34 +- .../jdbc/JDBCTransactionStoreConfig.java | 20 +- .../rocketmq/broker/BrokerControllerTest.java | 15 +- .../rocketmq/broker/BrokerTestHarness.java | 22 +- .../rocketmq/broker/api/SendMessageTest.java | 23 +- .../offset/ConsumerOffsetManagerTest.java | 15 +- .../broker/topic/TopicConfigManagerTest.java | 19 +- checkstyle/checkstyle.xml | 8 +- client/pom.xml | 2 +- .../apache/rocketmq/client/ClientConfig.java | 35 +- .../org/apache/rocketmq/client/MQAdmin.java | 33 +- .../org/apache/rocketmq/client/MQHelper.java | 42 +- .../org/apache/rocketmq/client/QueryResult.java | 22 +- .../org/apache/rocketmq/client/Validators.java | 32 +- .../rocketmq/client/admin/MQAdminExtInner.java | 12 +- .../client/common/ThreadLocalIndex.java | 20 +- .../consumer/AllocateMessageQueueStrategy.java | 27 +- .../client/consumer/DefaultMQPullConsumer.java | 75 +-- .../client/consumer/DefaultMQPushConsumer.java | 81 +-- .../rocketmq/client/consumer/MQConsumer.java | 22 +- .../client/consumer/MQPullConsumer.java | 52 +- .../consumer/MQPullConsumerScheduleService.java | 36 +- .../client/consumer/MQPushConsumer.java | 23 +- .../client/consumer/MessageQueueListener.java | 18 +- .../rocketmq/client/consumer/PullCallback.java | 12 +- .../rocketmq/client/consumer/PullResult.java | 30 +- .../rocketmq/client/consumer/PullStatus.java | 12 +- .../client/consumer/PullTaskCallback.java | 13 +- .../client/consumer/PullTaskContext.java | 16 +- .../listener/ConsumeConcurrentlyContext.java | 7 - .../listener/ConsumeConcurrentlyStatus.java | 12 +- .../listener/ConsumeOrderlyContext.java | 19 +- .../consumer/listener/ConsumeOrderlyStatus.java | 12 +- .../consumer/listener/ConsumeReturnType.java | 12 +- .../consumer/listener/MessageListener.java | 12 +- .../listener/MessageListenerConcurrently.java | 18 +- .../listener/MessageListenerOrderly.java | 18 +- .../AllocateMessageQueueAveragely.java | 19 +- .../AllocateMessageQueueAveragelyByCircle.java | 15 +- .../rebalance/AllocateMessageQueueByConfig.java | 19 +- .../AllocateMessageQueueByMachineRoom.java | 21 +- .../consumer/store/LocalFileOffsetStore.java | 51 +- .../consumer/store/OffsetSerializeWrapper.java | 20 +- .../client/consumer/store/OffsetStore.java | 23 +- .../client/consumer/store/ReadOffsetType.java | 12 +- .../consumer/store/RemoteBrokerOffsetStore.java | 53 +- .../client/exception/MQBrokerException.java | 18 +- .../client/exception/MQClientException.java | 17 +- .../client/hook/CheckForbiddenContext.java | 40 +- .../client/hook/CheckForbiddenHook.java | 14 +- .../client/hook/ConsumeMessageContext.java | 32 +- .../client/hook/ConsumeMessageHook.java | 12 +- .../client/hook/FilterMessageContext.java | 29 +- .../rocketmq/client/hook/FilterMessageHook.java | 13 +- .../client/hook/SendMessageContext.java | 35 +- .../rocketmq/client/hook/SendMessageHook.java | 12 +- .../client/impl/ClientRemotingProcessor.java | 56 +- .../rocketmq/client/impl/CommunicationMode.java | 12 +- .../rocketmq/client/impl/FindBrokerResult.java | 15 +- .../rocketmq/client/impl/MQAdminImpl.java | 114 ++-- .../rocketmq/client/impl/MQClientAPIImpl.java | 667 ++++++++++--------- .../rocketmq/client/impl/MQClientManager.java | 26 +- .../ConsumeMessageConcurrentlyService.java | 88 +-- .../consumer/ConsumeMessageOrderlyService.java | 135 ++-- .../impl/consumer/ConsumeMessageService.java | 31 +- .../consumer/DefaultMQPullConsumerImpl.java | 193 +++--- .../consumer/DefaultMQPushConsumerImpl.java | 261 ++++---- .../client/impl/consumer/MQConsumerInner.java | 26 +- .../client/impl/consumer/MessageQueueLock.java | 19 +- .../client/impl/consumer/ProcessQueue.java | 57 +- .../client/impl/consumer/PullAPIWrapper.java | 84 +-- .../impl/consumer/PullMessageService.java | 37 +- .../client/impl/consumer/PullRequest.java | 23 +- .../client/impl/consumer/PullResultExt.java | 22 +- .../client/impl/consumer/RebalanceImpl.java | 75 +-- .../client/impl/consumer/RebalancePullImpl.java | 22 +- .../client/impl/consumer/RebalancePushImpl.java | 32 +- .../client/impl/consumer/RebalanceService.java | 18 +- .../client/impl/factory/MQClientInstance.java | 234 ++++--- .../impl/producer/DefaultMQProducerImpl.java | 216 +++--- .../client/impl/producer/MQProducerInner.java | 27 +- .../client/impl/producer/TopicPublishInfo.java | 17 +- .../latency/LatencyFaultToleranceImpl.java | 52 +- .../client/latency/MQFaultStrategy.java | 7 +- .../rocketmq/client/log/ClientLogger.java | 25 +- .../client/producer/DefaultMQProducer.java | 96 +-- .../producer/LocalTransactionExecuter.java | 13 +- .../client/producer/LocalTransactionState.java | 12 +- .../rocketmq/client/producer/MQProducer.java | 71 +- .../client/producer/MessageQueueSelector.java | 16 +- .../rocketmq/client/producer/SendCallback.java | 13 +- .../rocketmq/client/producer/SendResult.java | 31 +- .../rocketmq/client/producer/SendStatus.java | 12 +- .../producer/TransactionCheckListener.java | 13 +- .../client/producer/TransactionMQProducer.java | 27 +- .../client/producer/TransactionSendResult.java | 15 +- .../selector/SelectMessageQueueByHash.java | 16 +- .../SelectMessageQueueByMachineRoom.java | 21 +- .../selector/SelectMessageQueueByRandoom.java | 19 +- .../client/stat/ConsumerStatsManager.java | 40 +- .../main/resources/logback_rocketmq_client.xml | 2 +- .../apache/rocketmq/client/ValidatorsTest.java | 13 +- common/pom.xml | 2 +- .../apache/rocketmq/common/BrokerConfig.java | 83 +-- .../rocketmq/common/BrokerConfigSingleton.java | 12 +- .../apache/rocketmq/common/ConfigManager.java | 17 +- .../apache/rocketmq/common/Configuration.java | 27 +- .../apache/rocketmq/common/CountDownLatch2.java | 111 ++- .../org/apache/rocketmq/common/DataVersion.java | 36 +- .../org/apache/rocketmq/common/MQVersion.java | 2 - .../java/org/apache/rocketmq/common/MixAll.java | 54 +- .../java/org/apache/rocketmq/common/Pair.java | 17 +- .../apache/rocketmq/common/ServiceState.java | 12 +- .../apache/rocketmq/common/ServiceThread.java | 28 +- .../org/apache/rocketmq/common/SystemClock.java | 12 +- .../rocketmq/common/ThreadFactoryImpl.java | 15 +- .../org/apache/rocketmq/common/TopicConfig.java | 68 +- .../apache/rocketmq/common/TopicFilterType.java | 12 +- .../org/apache/rocketmq/common/UtilAll.java | 114 ++-- .../rocketmq/common/admin/ConsumeStats.java | 21 +- .../rocketmq/common/admin/OffsetWrapper.java | 18 +- .../rocketmq/common/admin/RollbackStats.java | 24 +- .../rocketmq/common/admin/TopicOffset.java | 18 +- .../rocketmq/common/admin/TopicStatsTable.java | 18 +- .../common/constant/DBMsgConstants.java | 12 +- .../rocketmq/common/constant/LoggerName.java | 12 +- .../rocketmq/common/constant/PermName.java | 12 +- .../common/consumer/ConsumeFromWhere.java | 12 +- .../rocketmq/common/filter/FilterAPI.java | 18 +- .../rocketmq/common/filter/FilterContext.java | 14 +- .../rocketmq/common/filter/MessageFilter.java | 1 - .../apache/rocketmq/common/filter/impl/Op.java | 15 +- .../rocketmq/common/filter/impl/Operand.java | 12 +- .../rocketmq/common/filter/impl/Operator.java | 14 +- .../rocketmq/common/filter/impl/PolishExpr.java | 41 +- .../rocketmq/common/filter/impl/Type.java | 12 +- .../org/apache/rocketmq/common/help/FAQUrl.java | 49 +- .../rocketmq/common/hook/FilterCheckHook.java | 14 +- .../apache/rocketmq/common/message/Message.java | 48 +- .../common/message/MessageAccessor.java | 24 +- .../common/message/MessageClientExt.java | 34 +- .../common/message/MessageClientIDSetter.java | 16 +- .../rocketmq/common/message/MessageConst.java | 2 - .../rocketmq/common/message/MessageDecoder.java | 71 +- .../rocketmq/common/message/MessageExt.java | 58 +- .../rocketmq/common/message/MessageId.java | 18 +- .../rocketmq/common/message/MessageQueue.java | 27 +- .../common/message/MessageQueueForC.java | 30 +- .../rocketmq/common/message/MessageType.java | 12 +- .../rocketmq/common/namesrv/NamesrvConfig.java | 25 +- .../rocketmq/common/namesrv/NamesrvUtil.java | 12 +- .../common/namesrv/RegisterBrokerResult.java | 19 +- .../rocketmq/common/namesrv/TopAddressing.java | 53 +- .../common/protocol/MQProtosHelper.java | 17 +- .../rocketmq/common/protocol/RequestCode.java | 16 +- .../rocketmq/common/protocol/ResponseCode.java | 17 +- .../common/protocol/body/BrokerStatsData.java | 19 +- .../common/protocol/body/BrokerStatsItem.java | 18 +- .../rocketmq/common/protocol/body/CMResult.java | 12 +- .../common/protocol/body/ClusterInfo.java | 28 +- .../common/protocol/body/Connection.java | 21 +- .../common/protocol/body/ConsumeByWho.java | 26 +- .../body/ConsumeMessageDirectlyResult.java | 28 +- .../common/protocol/body/ConsumeStatsList.java | 18 +- .../common/protocol/body/ConsumeStatus.java | 24 +- .../protocol/body/ConsumerConnection.java | 31 +- .../body/ConsumerOffsetSerializeWrapper.java | 20 +- .../protocol/body/ConsumerRunningInfo.java | 134 ++-- .../protocol/body/GetConsumerStatusBody.java | 24 +- .../common/protocol/body/GroupList.java | 18 +- .../rocketmq/common/protocol/body/KVTable.java | 18 +- .../protocol/body/LockBatchRequestBody.java | 24 +- .../protocol/body/LockBatchResponseBody.java | 20 +- .../common/protocol/body/ProcessQueueInfo.java | 54 +- .../protocol/body/ProducerConnection.java | 18 +- .../protocol/body/QueryConsumeTimeSpanBody.java | 18 +- .../body/QueryCorrectionOffsetBody.java | 18 +- .../common/protocol/body/QueueTimeSpan.java | 29 +- .../protocol/body/RegisterBrokerBody.java | 20 +- .../common/protocol/body/ResetOffsetBody.java | 18 +- .../protocol/body/ResetOffsetBodyForC.java | 17 +- .../protocol/body/SubscriptionGroupWrapper.java | 24 +- .../body/TopicConfigSerializeWrapper.java | 22 +- .../common/protocol/body/TopicList.java | 20 +- .../protocol/body/UnlockBatchRequestBody.java | 24 +- .../CheckTransactionStateRequestHeader.java | 20 +- .../CheckTransactionStateResponseHeader.java | 24 +- .../header/CloneGroupOffsetRequestHeader.java | 24 +- ...nsumeMessageDirectlyResultRequestHeader.java | 22 +- .../ConsumerSendMsgBackRequestHeader.java | 31 +- .../header/CreateTopicRequestHeader.java | 33 +- .../DeleteSubscriptionGroupRequestHeader.java | 16 +- .../header/DeleteTopicRequestHeader.java | 18 +- .../header/EndTransactionRequestHeader.java | 33 +- .../header/EndTransactionResponseHeader.java | 15 +- .../header/GetAllTopicConfigResponseHeader.java | 15 +- .../header/GetBrokerConfigResponseHeader.java | 18 +- .../header/GetConsumeStatsInBrokerHeader.java | 13 +- .../header/GetConsumeStatsRequestHeader.java | 18 +- .../GetConsumerConnectionListRequestHeader.java | 16 +- .../GetConsumerListByGroupRequestHeader.java | 16 +- .../GetConsumerListByGroupResponseBody.java | 18 +- .../GetConsumerListByGroupResponseHeader.java | 13 +- .../GetConsumerRunningInfoRequestHeader.java | 20 +- .../header/GetConsumerStatusRequestHeader.java | 20 +- .../GetEarliestMsgStoretimeRequestHeader.java | 20 +- .../GetEarliestMsgStoretimeResponseHeader.java | 18 +- .../header/GetMaxOffsetRequestHeader.java | 20 +- .../header/GetMaxOffsetResponseHeader.java | 18 +- .../header/GetMinOffsetRequestHeader.java | 20 +- .../header/GetMinOffsetResponseHeader.java | 18 +- .../GetProducerConnectionListRequestHeader.java | 16 +- .../header/GetTopicStatsInfoRequestHeader.java | 16 +- .../header/GetTopicsByClusterRequestHeader.java | 16 +- .../NotifyConsumerIdsChangedRequestHeader.java | 16 +- .../header/PullMessageRequestHeader.java | 36 +- .../header/PullMessageResponseHeader.java | 24 +- .../QueryConsumeTimeSpanRequestHeader.java | 18 +- .../QueryConsumerOffsetRequestHeader.java | 22 +- .../QueryConsumerOffsetResponseHeader.java | 18 +- .../header/QueryCorrectionOffsetHeader.java | 22 +- .../header/QueryMessageRequestHeader.java | 26 +- .../header/QueryMessageResponseHeader.java | 20 +- .../QueryTopicConsumeByWhoRequestHeader.java | 18 +- .../header/ResetOffsetRequestHeader.java | 22 +- .../header/SearchOffsetRequestHeader.java | 22 +- .../header/SearchOffsetResponseHeader.java | 18 +- .../header/SendMessageRequestHeader.java | 40 +- .../header/SendMessageRequestHeaderV2.java | 24 - .../header/SendMessageResponseHeader.java | 22 +- .../header/UnregisterClientRequestHeader.java | 20 +- .../header/UnregisterClientResponseHeader.java | 13 +- .../UpdateConsumerOffsetRequestHeader.java | 24 +- .../UpdateConsumerOffsetResponseHeader.java | 15 +- .../ViewBrokerStatsDataRequestHeader.java | 18 +- .../header/ViewMessageRequestHeader.java | 18 +- .../header/ViewMessageResponseHeader.java | 15 +- .../RegisterFilterServerRequestHeader.java | 16 +- .../RegisterFilterServerResponseHeader.java | 18 +- ...RegisterMessageFilterClassRequestHeader.java | 22 +- .../namesrv/DeleteKVConfigRequestHeader.java | 18 +- .../DeleteTopicInNamesrvRequestHeader.java | 16 +- .../namesrv/GetKVConfigRequestHeader.java | 18 +- .../namesrv/GetKVConfigResponseHeader.java | 16 +- .../GetKVListByNamespaceRequestHeader.java | 16 +- .../namesrv/GetRouteInfoRequestHeader.java | 18 +- .../namesrv/GetRouteInfoResponseHeader.java | 15 +- .../namesrv/PutKVConfigRequestHeader.java | 20 +- .../namesrv/RegisterBrokerRequestHeader.java | 26 +- .../namesrv/RegisterBrokerResponseHeader.java | 18 +- .../RegisterOrderTopicRequestHeader.java | 20 +- .../namesrv/UnRegisterBrokerRequestHeader.java | 24 +- .../WipeWritePermOfBrokerRequestHeader.java | 16 +- .../WipeWritePermOfBrokerResponseHeader.java | 16 +- .../common/protocol/heartbeat/ConsumeType.java | 15 +- .../common/protocol/heartbeat/ConsumerData.java | 35 +- .../protocol/heartbeat/HeartbeatData.java | 27 +- .../common/protocol/heartbeat/MessageModel.java | 15 +- .../common/protocol/heartbeat/ProducerData.java | 17 +- .../protocol/heartbeat/SubscriptionData.java | 39 +- .../common/protocol/route/BrokerData.java | 20 +- .../common/protocol/route/QueueData.java | 20 +- .../common/protocol/route/TopicRouteData.java | 27 +- .../common/protocol/topic/OffsetMovedEvent.java | 24 +- .../common/queue/ConcurrentTreeMap.java | 27 +- .../rocketmq/common/queue/RoundQueue.java | 15 +- .../rocketmq/common/running/RunningStats.java | 12 +- .../rocketmq/common/stats/MomentStatsItem.java | 19 +- .../common/stats/MomentStatsItemSet.java | 11 +- .../apache/rocketmq/common/stats/StatsItem.java | 68 +- .../rocketmq/common/stats/StatsItemSet.java | 14 +- .../rocketmq/common/stats/StatsSnapshot.java | 18 +- .../subscription/SubscriptionGroupConfig.java | 35 +- .../rocketmq/common/sysflag/MessageSysFlag.java | 3 - .../rocketmq/common/sysflag/PullSysFlag.java | 20 +- .../common/sysflag/SubscriptionSysFlag.java | 5 - .../rocketmq/common/sysflag/TopicSysFlag.java | 8 - .../rocketmq/common/utils/ChannelUtil.java | 15 +- .../rocketmq/common/utils/HttpTinyClient.java | 36 +- .../rocketmq/common/utils/IOTinyUtils.java | 42 +- .../org/apache/rocketmq/common/MixAllTest.java | 18 +- .../rocketmq/common/RemotingUtilTest.java | 13 +- .../org/apache/rocketmq/common/UtilAllTest.java | 31 +- .../rocketmq/common/filter/FilterAPITest.java | 17 +- .../common/protocol/ConsumeStatusTest.java | 13 +- conf/2m-2s-async/broker-a-s.properties | 1 - conf/2m-2s-async/broker-a.properties | 1 - conf/2m-2s-async/broker-b-s.properties | 1 - conf/2m-2s-async/broker-b.properties | 1 - conf/2m-2s-sync/broker-a-s.properties | 1 - conf/2m-2s-sync/broker-a.properties | 1 - conf/2m-2s-sync/broker-b-s.properties | 1 - conf/2m-2s-sync/broker-b.properties | 1 - conf/2m-noslave/broker-a.properties | 1 - conf/2m-noslave/broker-b.properties | 1 - conf/broker.conf | 14 +- conf/logback_broker.xml | 22 +- conf/logback_filtersrv.xml | 4 +- conf/logback_namesrv.xml | 4 +- conf/logback_tools.xml | 4 +- example/pom.xml | 2 +- .../rocketmq/example/benchmark/Consumer.java | 53 +- .../rocketmq/example/benchmark/Producer.java | 52 +- .../example/benchmark/TransactionProducer.java | 67 +- .../example/broadcast/PushConsumer.java | 5 +- .../rocketmq/example/filter/Consumer.java | 8 +- .../rocketmq/example/filter/Producer.java | 6 +- .../rocketmq/example/operation/Consumer.java | 21 +- .../rocketmq/example/operation/Producer.java | 27 +- .../rocketmq/example/ordermessage/Consumer.java | 6 +- .../rocketmq/example/ordermessage/Producer.java | 13 +- .../rocketmq/example/quickstart/Consumer.java | 5 +- .../rocketmq/example/quickstart/Producer.java | 4 +- .../rocketmq/example/simple/AsyncProducer.java | 10 +- .../rocketmq/example/simple/CachedQueue.java | 17 +- .../rocketmq/example/simple/Producer.java | 7 +- .../rocketmq/example/simple/PullConsumer.java | 10 +- .../example/simple/PullScheduleService.java | 2 - .../rocketmq/example/simple/PushConsumer.java | 4 +- .../example/simple/RandomAsyncCommit.java | 23 +- .../rocketmq/example/simple/TestProducer.java | 8 +- .../TransactionCheckListenerImpl.java | 5 +- .../transaction/TransactionExecuterImpl.java | 16 +- .../transaction/TransactionProducer.java | 9 +- .../src/main/resources/MessageFilterImpl.java | 15 +- filtersrv/pom.xml | 2 +- .../filtersrv/FilterServerOuterAPI.java | 17 +- .../rocketmq/filtersrv/FiltersrvConfig.java | 41 +- .../rocketmq/filtersrv/FiltersrvController.java | 61 +- .../rocketmq/filtersrv/FiltersrvStartup.java | 34 +- .../rocketmq/filtersrv/filter/DynaCode.java | 177 +++-- .../filter/FilterClassFetchMethod.java | 12 +- .../filtersrv/filter/FilterClassInfo.java | 19 +- .../filtersrv/filter/FilterClassLoader.java | 12 +- .../filtersrv/filter/FilterClassManager.java | 67 +- .../filter/HttpFilterClassFetchMethod.java | 17 +- .../processor/DefaultRequestProcessor.java | 91 ++- .../stats/FilterServerStatsManager.java | 29 +- namesrv/pom.xml | 2 +- .../rocketmq/namesrv/NamesrvController.java | 45 +- .../apache/rocketmq/namesrv/NamesrvStartup.java | 36 +- .../namesrv/kvconfig/KVConfigManager.java | 45 +- .../kvconfig/KVConfigSerializeWrapper.java | 18 +- .../processor/ClusterTestRequestProcessor.java | 25 +- .../processor/DefaultRequestProcessor.java | 154 ++--- .../routeinfo/BrokerHousekeepingService.java | 20 +- .../namesrv/routeinfo/RouteInfoManager.java | 142 ++-- remoting/pom.xml | 2 +- .../rocketmq/remoting/ChannelEventListener.java | 16 +- .../rocketmq/remoting/CommandCustomHeader.java | 13 +- .../rocketmq/remoting/InvokeCallback.java | 13 +- .../org/apache/rocketmq/remoting/RPCHook.java | 4 +- .../rocketmq/remoting/RemotingClient.java | 38 +- .../rocketmq/remoting/RemotingServer.java | 26 +- .../rocketmq/remoting/RemotingService.java | 2 - .../remoting/annotation/CFNullable.java | 12 +- .../apache/rocketmq/remoting/common/Pair.java | 17 +- .../remoting/common/RemotingHelper.java | 41 +- .../rocketmq/remoting/common/RemotingUtil.java | 29 +- .../common/SemaphoreReleaseOnlyOnce.java | 16 +- .../rocketmq/remoting/common/ServiceThread.java | 19 +- .../exception/RemotingCommandException.java | 14 +- .../exception/RemotingConnectException.java | 14 +- .../remoting/exception/RemotingException.java | 14 +- .../exception/RemotingSendRequestException.java | 14 +- .../exception/RemotingTimeoutException.java | 15 +- .../RemotingTooMuchRequestException.java | 13 +- .../remoting/netty/NettyClientConfig.java | 31 +- .../rocketmq/remoting/netty/NettyDecoder.java | 28 +- .../rocketmq/remoting/netty/NettyEncoder.java | 24 +- .../rocketmq/remoting/netty/NettyEvent.java | 18 +- .../rocketmq/remoting/netty/NettyEventType.java | 12 +- .../remoting/netty/NettyRemotingAbstract.java | 85 ++- .../remoting/netty/NettyRemotingClient.java | 111 ++- .../remoting/netty/NettyRemotingServer.java | 120 ++-- .../remoting/netty/NettyRequestProcessor.java | 18 +- .../remoting/netty/NettyServerConfig.java | 36 +- .../remoting/netty/NettySystemConfig.java | 26 +- .../rocketmq/remoting/netty/RequestTask.java | 25 +- .../rocketmq/remoting/netty/ResponseFuture.java | 45 +- .../remoting/protocol/LanguageCode.java | 30 +- .../remoting/protocol/RemotingCommand.java | 73 +- .../remoting/protocol/RemotingCommandType.java | 12 +- .../remoting/protocol/RemotingSerializable.java | 14 +- .../protocol/RemotingSysResponseCode.java | 12 +- .../remoting/protocol/RocketMQSerializable.java | 54 +- .../remoting/protocol/SerializeType.java | 16 +- .../org/apache/rocketmq/remoting/MixTest.java | 15 +- .../apache/rocketmq/remoting/NettyRPCTest.java | 92 ++- .../rocketmq/subclass/TestSubClassAuto.java | 13 +- srvutil/pom.xml | 2 +- .../org/apache/rocketmq/srvutil/ServerUtil.java | 29 +- store/pom.xml | 2 +- .../store/AllocateMappedFileService.java | 59 +- .../rocketmq/store/AppendMessageCallback.java | 15 +- .../rocketmq/store/AppendMessageResult.java | 42 +- .../rocketmq/store/AppendMessageStatus.java | 12 +- .../org/apache/rocketmq/store/CommitLog.java | 300 ++++----- .../org/apache/rocketmq/store/ConsumeQueue.java | 78 +-- .../rocketmq/store/DefaultMessageFilter.java | 13 +- .../rocketmq/store/DefaultMessageStore.java | 195 +++--- .../apache/rocketmq/store/DispatchRequest.java | 35 +- .../apache/rocketmq/store/GetMessageResult.java | 45 +- .../apache/rocketmq/store/GetMessageStatus.java | 12 +- .../org/apache/rocketmq/store/MappedFile.java | 176 ++--- .../apache/rocketmq/store/MappedFileQueue.java | 82 +-- .../rocketmq/store/MessageArrivingListener.java | 12 +- .../rocketmq/store/MessageExtBrokerInner.java | 17 +- .../apache/rocketmq/store/MessageFilter.java | 13 +- .../org/apache/rocketmq/store/MessageStore.java | 49 +- .../apache/rocketmq/store/PutMessageResult.java | 21 +- .../apache/rocketmq/store/PutMessageStatus.java | 12 +- .../rocketmq/store/QueryMessageResult.java | 23 +- .../rocketmq/store/ReferenceResource.java | 22 +- .../org/apache/rocketmq/store/RunningFlags.java | 26 +- .../store/SelectMappedBufferResult.java | 21 +- .../apache/rocketmq/store/StoreCheckpoint.java | 40 +- .../rocketmq/store/StoreStatsService.java | 88 +-- .../org/apache/rocketmq/store/StoreUtil.java | 16 +- .../rocketmq/store/TransientStorePool.java | 15 +- .../rocketmq/store/config/BrokerRole.java | 12 +- .../rocketmq/store/config/FlushDiskType.java | 12 +- .../store/config/MessageStoreConfig.java | 132 +--- .../store/config/StorePathConfigHelper.java | 19 +- .../apache/rocketmq/store/ha/HAConnection.java | 63 +- .../org/apache/rocketmq/store/ha/HAService.java | 95 +-- .../rocketmq/store/ha/WaitNotifyObject.java | 18 +- .../apache/rocketmq/store/index/IndexFile.java | 72 +- .../rocketmq/store/index/IndexHeader.java | 15 - .../rocketmq/store/index/IndexService.java | 51 +- .../rocketmq/store/index/QueryOffsetResult.java | 19 +- .../schedule/DelayOffsetSerializeWrapper.java | 20 +- .../store/schedule/ScheduleMessageService.java | 112 ++-- .../rocketmq/store/stats/BrokerStats.java | 29 +- .../store/stats/BrokerStatsManager.java | 34 +- .../org/apache/rocketmq/store/util/LibC.java | 15 +- .../rocketmq/store/DefaultMessageStoreTest.java | 34 +- .../rocketmq/store/MappedFileQueueTest.java | 56 +- .../apache/rocketmq/store/MappedFileTest.java | 33 +- .../rocketmq/store/StoreCheckpointTest.java | 20 +- .../rocketmq/store/index/IndexFileTest.java | 25 +- .../store/schedule/ScheduleMessageTest.java | 35 +- store/src/test/resources/logback-test.xml | 24 +- style/copyright/Apache.xml | 9 +- style/copyright/profiles_settings.xml | 84 +-- style/rmq_codeStyle.xml | 204 +++--- tools/pom.xml | 2 +- .../rocketmq/tools/admin/DefaultMQAdminExt.java | 141 ++-- .../tools/admin/DefaultMQAdminExtImpl.java | 212 +++--- .../apache/rocketmq/tools/admin/MQAdminExt.java | 141 ++-- .../rocketmq/tools/admin/api/MessageTrack.java | 21 +- .../rocketmq/tools/admin/api/TrackType.java | 12 +- .../rocketmq/tools/command/CommandUtil.java | 46 +- .../rocketmq/tools/command/MQAdminStartup.java | 72 +- .../rocketmq/tools/command/SubCommand.java | 18 +- .../broker/BrokerConsumeStatsSubCommad.java | 46 +- .../command/broker/BrokerStatusSubCommand.java | 24 +- .../broker/CleanExpiredCQSubCommand.java | 22 +- .../command/broker/CleanUnusedTopicCommand.java | 22 +- .../command/broker/GetBrokerConfigCommand.java | 39 +- .../command/broker/SendMsgStatusCommand.java | 36 +- .../broker/UpdateBrokerConfigSubCommand.java | 29 +- .../cluster/CLusterSendMsgRTCommand.java | 55 +- .../command/cluster/ClusterListSubCommand.java | 89 ++- .../ConsumerConnectionSubCommand.java | 29 +- .../ProducerConnectionSubCommand.java | 17 +- .../consumer/ConsumerProgressSubCommand.java | 84 +-- .../consumer/ConsumerStatusSubCommand.java | 30 +- .../command/consumer/ConsumerSubCommand.java | 32 +- .../DeleteSubscriptionGroupCommand.java | 35 +- .../consumer/StartMonitoringSubCommand.java | 23 +- .../consumer/UpdateSubGroupSubCommand.java | 29 +- .../command/message/CheckMsgSendRTCommand.java | 34 +- .../command/message/DecodeMessageIdCommond.java | 18 +- .../message/PrintMessageByQueueCommand.java | 157 +++-- .../command/message/PrintMessageSubCommand.java | 76 +-- .../command/message/QueryMsgByIdSubCommand.java | 287 ++++---- .../message/QueryMsgByKeySubCommand.java | 27 +- .../message/QueryMsgByOffsetSubCommand.java | 7 +- .../message/QueryMsgByUniqueKeySubCommand.java | 177 +++-- .../rocketmq/tools/command/message/Store.java | 56 +- .../command/namesrv/DeleteKvConfigCommand.java | 22 +- .../namesrv/GetNamesrvConfigCommand.java | 25 +- .../command/namesrv/UpdateKvConfigCommand.java | 22 +- .../namesrv/UpdateNamesrvConfigCommand.java | 27 +- .../namesrv/WipeWritePermSubCommand.java | 34 +- .../command/offset/CloneGroupOffsetCommand.java | 23 +- .../offset/GetConsumerStatusCommand.java | 29 +- .../offset/ResetOffsetByTimeCommand.java | 32 +- .../offset/ResetOffsetByTimeOldCommand.java | 80 ++- .../tools/command/stats/StatsAllSubCommand.java | 176 +++-- .../command/topic/AllocateMQSubCommand.java | 20 +- .../command/topic/DeleteTopicSubCommand.java | 70 +- .../tools/command/topic/RebalanceResult.java | 15 +- .../command/topic/TopicClusterSubCommand.java | 12 +- .../command/topic/TopicListSubCommand.java | 34 +- .../command/topic/TopicRouteSubCommand.java | 10 +- .../command/topic/TopicStatusSubCommand.java | 49 +- .../command/topic/UpdateOrderConfCommand.java | 26 +- .../topic/UpdateTopicPermSubCommand.java | 17 +- .../command/topic/UpdateTopicSubCommand.java | 25 +- .../tools/monitor/DefaultMonitorListener.java | 40 +- .../rocketmq/tools/monitor/DeleteMsgsEvent.java | 20 +- .../rocketmq/tools/monitor/FailedMsgs.java | 21 +- .../rocketmq/tools/monitor/MonitorConfig.java | 19 +- .../rocketmq/tools/monitor/MonitorListener.java | 3 +- .../rocketmq/tools/monitor/MonitorService.java | 65 +- .../rocketmq/tools/monitor/UndoneMsgs.java | 27 +- 555 files changed, 8226 insertions(+), 11139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/pom.xml ---------------------------------------------------------------------- diff --git a/broker/pom.xml b/broker/pom.xml index 0917503..30525e4 100644 --- a/broker/pom.xml +++ b/broker/pom.xml @@ -15,7 +15,7 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.rocketmq</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 8e973ac..501c1c5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -16,7 +16,25 @@ */ package org.apache.rocketmq.broker; -import org.apache.rocketmq.broker.client.*; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.client.ClientHousekeepingService; +import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; +import org.apache.rocketmq.broker.client.ConsumerManager; +import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener; +import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager; import org.apache.rocketmq.broker.filtersrv.FilterServerManager; @@ -30,11 +48,21 @@ import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.broker.plugin.MessageStoreFactory; import org.apache.rocketmq.broker.plugin.MessageStorePluginContext; -import org.apache.rocketmq.broker.processor.*; +import org.apache.rocketmq.broker.processor.AdminBrokerProcessor; +import org.apache.rocketmq.broker.processor.ClientManageProcessor; +import org.apache.rocketmq.broker.processor.ConsumerManageProcessor; +import org.apache.rocketmq.broker.processor.EndTransactionProcessor; +import org.apache.rocketmq.broker.processor.PullMessageProcessor; +import org.apache.rocketmq.broker.processor.QueryMessageProcessor; +import org.apache.rocketmq.broker.processor.SendMessageProcessor; import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; -import org.apache.rocketmq.common.*; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.Configuration; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; @@ -43,7 +71,11 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.stats.MomentStatsItem; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingServer; -import org.apache.rocketmq.remoting.netty.*; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingServer; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageStore; @@ -54,15 +86,6 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.*; - - public class BrokerController { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME); @@ -84,7 +107,7 @@ public class BrokerController { private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager(); private final BrokerOuterAPI brokerOuterAPI; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "BrokerControllerScheduledThread")); + "BrokerControllerScheduledThread")); private final SlaveSynchronize slaveSynchronize; private final BlockingQueue<Runnable> sendThreadPoolQueue; private final BlockingQueue<Runnable> pullThreadPoolQueue; @@ -110,10 +133,10 @@ public class BrokerController { private Configuration configuration; public BrokerController(// - final BrokerConfig brokerConfig, // - final NettyServerConfig nettyServerConfig, // - final NettyClientConfig nettyClientConfig, // - final MessageStoreConfig messageStoreConfig // + final BrokerConfig brokerConfig, // + final NettyServerConfig nettyServerConfig, // + final NettyClientConfig nettyClientConfig, // + final MessageStoreConfig messageStoreConfig // ) { this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; @@ -151,9 +174,9 @@ public class BrokerController { this.brokerFastFailure = new BrokerFastFailure(this); this.configuration = new Configuration( - log, - BrokerPathConfigHelper.getBrokerConfigPath(), - this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig + log, + BrokerPathConfigHelper.getBrokerConfigPath(), + this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig ); } @@ -180,9 +203,9 @@ public class BrokerController { if (result) { try { this.messageStore = - new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, - this.brokerConfig); - this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); + new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, + this.brokerConfig); + this.brokerStats = new BrokerStats((DefaultMessageStore)this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); @@ -196,44 +219,43 @@ public class BrokerController { if (result) { this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); - NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); + NettyServerConfig fastConfig = (NettyServerConfig)this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getSendMessageThreadPoolNums(), - this.brokerConfig.getSendMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.sendThreadPoolQueue, - new ThreadFactoryImpl("SendMessageThread_")); + this.brokerConfig.getSendMessageThreadPoolNums(), + this.brokerConfig.getSendMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.sendThreadPoolQueue, + new ThreadFactoryImpl("SendMessageThread_")); this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getPullMessageThreadPoolNums(), - this.brokerConfig.getPullMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.pullThreadPoolQueue, - new ThreadFactoryImpl("PullMessageThread_")); + this.brokerConfig.getPullMessageThreadPoolNums(), + this.brokerConfig.getPullMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.pullThreadPoolQueue, + new ThreadFactoryImpl("PullMessageThread_")); this.adminBrokerExecutor = - Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( - "AdminBrokerThread_")); + Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( + "AdminBrokerThread_")); this.clientManageExecutor = new ThreadPoolExecutor( - this.brokerConfig.getClientManageThreadPoolNums(), - this.brokerConfig.getClientManageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.clientManagerThreadPoolQueue, - new ThreadFactoryImpl("ClientManageThread_")); + this.brokerConfig.getClientManageThreadPoolNums(), + this.brokerConfig.getClientManageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.clientManagerThreadPoolQueue, + new ThreadFactoryImpl("ClientManageThread_")); this.consumerManageExecutor = - Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( - "ConsumerManageThread_")); + Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( + "ConsumerManageThread_")); this.registerProcessor(); - // TODO remove in future final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; @@ -259,7 +281,6 @@ public class BrokerController { } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -399,7 +420,6 @@ public class BrokerController { this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - /** * EndTransactionProcessor */ @@ -446,7 +466,8 @@ public class BrokerController { slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp(); } - if (slowTimeMills < 0) slowTimeMills = 0; + if (slowTimeMills < 0) + slowTimeMills = 0; return slowTimeMills; } @@ -577,10 +598,10 @@ public class BrokerController { private void unregisterBrokerAll() { this.brokerOuterAPI.unregisterBrokerAll( - this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId()); + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId()); } public String getBrokerAddr() { @@ -643,27 +664,27 @@ public class BrokerController { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) - || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = - new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), - this.brokerConfig.getBrokerPermission()); + new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), + this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); } RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll( - this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId(), - this.getHAServerAddr(), - topicConfigWrapper, - this.filterServerManager.buildNewFilterServerList(), - oneway, - this.brokerConfig.getRegisterBrokerTimeoutMills()); + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.getHAServerAddr(), + topicConfigWrapper, + this.filterServerManager.buildNewFilterServerList(), + oneway, + this.brokerConfig.getRegisterBrokerTimeoutMills()); if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java index dbcd304..7a46df3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java @@ -6,45 +6,39 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker; import java.io.File; - public class BrokerPathConfigHelper { private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store" - + File.separator + "config" + File.separator + "broker.properties"; - + + File.separator + "config" + File.separator + "broker.properties"; public static String getBrokerConfigPath() { return brokerConfigPath; } - public static void setBrokerConfigPath(String path) { brokerConfigPath = path; } - public static String getTopicConfigPath(final String rootDir) { return rootDir + File.separator + "config" + File.separator + "topics.json"; } - public static String getConsumerOffsetPath(final String rootDir) { return rootDir + File.separator + "config" + File.separator + "consumerOffset.json"; } - public static String getSubscriptionGroupPath(final String rootDir) { return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json"; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index 86091c4..dfa97c1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -18,6 +18,15 @@ package org.apache.rocketmq.broker; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -30,20 +39,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - - public class BrokerStartup { public static Properties properties = null; public static CommandLine commandLine = null; @@ -58,7 +56,7 @@ public class BrokerStartup { try { controller.start(); String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", " - + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); + + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); if (null != controller.getBrokerConfig().getNamesrvAddr()) { tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr(); @@ -89,7 +87,7 @@ public class BrokerStartup { //PackageConflictDetect.detectFastjson(); Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), - new PosixParser()); + new PosixParser()); if (null == commandLine) { System.exit(-1); } @@ -142,7 +140,7 @@ public class BrokerStartup { if (null == brokerConfig.getRocketmqHome()) { System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV - + " variable in your environment to match the location of the RocketMQ installation"); + + " variable in your environment to match the location of the RocketMQ installation"); System.exit(-2); } @@ -157,13 +155,12 @@ public class BrokerStartup { } } catch (Exception e) { System.out.printf( - "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", - namesrvAddr); + "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", + namesrvAddr); System.exit(-3); } } - switch (messageStoreConfig.getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: @@ -181,7 +178,7 @@ public class BrokerStartup { } messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); - LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); @@ -194,10 +191,10 @@ public class BrokerStartup { MixAll.printObjectProperties(log, messageStoreConfig); final BrokerController controller = new BrokerController(// - brokerConfig, // - nettyServerConfig, // - nettyClientConfig, // - messageStoreConfig); + brokerConfig, // + nettyServerConfig, // + nettyClientConfig, // + messageStoreConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java index a994503..24cddb9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.client; -import org.apache.rocketmq.remoting.protocol.LanguageCode; import io.netty.channel.Channel; - +import org.apache.rocketmq.remoting.protocol.LanguageCode; public class ClientChannelInfo { private final Channel channel; @@ -27,12 +26,10 @@ public class ClientChannelInfo { private final int version; private volatile long lastUpdateTimestamp = System.currentTimeMillis(); - public ClientChannelInfo(Channel channel) { this(channel, null, null, 0); } - public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) { this.channel = channel; this.clientId = clientId; @@ -40,37 +37,30 @@ public class ClientChannelInfo { this.version = version; } - public Channel getChannel() { return channel; } - public String getClientId() { return clientId; } - public LanguageCode getLanguage() { return language; } - public int getVersion() { return version; } - public long getLastUpdateTimestamp() { return lastUpdateTimestamp; } - public void setLastUpdateTimestamp(long lastUpdateTimestamp) { this.lastUpdateTimestamp = lastUpdateTimestamp; } - @Override public int hashCode() { final int prime = 31; @@ -78,12 +68,11 @@ public class ClientChannelInfo { result = prime * result + ((channel == null) ? 0 : channel.hashCode()); result = prime * result + ((clientId == null) ? 0 : clientId.hashCode()); result = prime * result + ((language == null) ? 0 : language.hashCode()); - result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32)); + result = prime * result + (int)(lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32)); result = prime * result + version; return result; } - @Override public boolean equals(Object obj) { if (this == obj) @@ -92,7 +81,7 @@ public class ClientChannelInfo { return false; if (getClass() != obj.getClass()) return false; - ClientChannelInfo other = (ClientChannelInfo) obj; + ClientChannelInfo other = (ClientChannelInfo)obj; if (channel == null) { if (other.channel != null) return false; @@ -103,10 +92,9 @@ public class ClientChannelInfo { return true; } - @Override public String toString() { return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language - + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]"; + + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java index 856ce72..5d7c0ea 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java @@ -6,42 +6,38 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.client; +import io.netty.channel.Channel; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.remoting.ChannelEventListener; -import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - public class ClientHousekeepingService implements ChannelEventListener { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; private ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread")); - + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread")); public ClientHousekeepingService(final BrokerController brokerController) { this.brokerController = brokerController; } - public void start() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @@ -71,7 +67,6 @@ public class ClientHousekeepingService implements ChannelEventListener { } - @Override public void onChannelClose(String remoteAddr, Channel channel) { this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); @@ -79,7 +74,6 @@ public class ClientHousekeepingService implements ChannelEventListener { this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); } - @Override public void onChannelException(String remoteAddr, Channel channel) { this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); @@ -87,7 +81,6 @@ public class ClientHousekeepingService implements ChannelEventListener { this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); } - @Override public void onChannelIdle(String remoteAddr, Channel channel) { this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java index d5b056e..2656467 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java @@ -16,45 +16,41 @@ */ package org.apache.rocketmq.broker.client; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import io.netty.channel.Channel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ConsumerGroupInfo { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final String groupName; private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = - new ConcurrentHashMap<String, SubscriptionData>(); + new ConcurrentHashMap<String, SubscriptionData>(); private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = - new ConcurrentHashMap<Channel, ClientChannelInfo>(16); + new ConcurrentHashMap<Channel, ClientChannelInfo>(16); private volatile ConsumeType consumeType; private volatile MessageModel messageModel; private volatile ConsumeFromWhere consumeFromWhere; private volatile long lastUpdateTimestamp = System.currentTimeMillis(); - public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel, - ConsumeFromWhere consumeFromWhere) { + ConsumeFromWhere consumeFromWhere) { this.groupName = groupName; this.consumeType = consumeType; this.messageModel = messageModel; this.consumeFromWhere = consumeFromWhere; } - public ClientChannelInfo findChannel(final String clientId) { Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator(); while (it.hasNext()) { @@ -67,17 +63,14 @@ public class ConsumerGroupInfo { return null; } - public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() { return subscriptionTable; } - public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() { return channelInfoTable; } - public List<Channel> getAllChannel() { List<Channel> result = new ArrayList<Channel>(); @@ -86,7 +79,6 @@ public class ConsumerGroupInfo { return result; } - public List<String> getAllClientId() { List<String> result = new ArrayList<String>(); @@ -101,7 +93,6 @@ public class ConsumerGroupInfo { return result; } - public void unregisterChannel(final ClientChannelInfo clientChannelInfo) { ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel()); if (old != null) { @@ -109,13 +100,12 @@ public class ConsumerGroupInfo { } } - public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { final ClientChannelInfo info = this.channelInfoTable.remove(channel); if (info != null) { log.warn( - "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}", - info.toString(), groupName); + "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}", + info.toString(), groupName); return true; } @@ -123,7 +113,7 @@ public class ConsumerGroupInfo { } public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType, - MessageModel messageModel, ConsumeFromWhere consumeFromWhere) { + MessageModel messageModel, ConsumeFromWhere consumeFromWhere) { boolean updated = false; this.consumeType = consumeType; this.messageModel = messageModel; @@ -134,7 +124,7 @@ public class ConsumerGroupInfo { ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew); if (null == prev) { log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType, - messageModel, infoNew.toString()); + messageModel, infoNew.toString()); updated = true; } @@ -142,9 +132,9 @@ public class ConsumerGroupInfo { } else { if (!infoOld.getClientId().equals(infoNew.getClientId())) { log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", - this.groupName, - infoOld.toString(), - infoNew.toString()); + this.groupName, + infoOld.toString(), + infoNew.toString()); this.channelInfoTable.put(infoNew.getChannel(), infoNew); } } @@ -155,7 +145,6 @@ public class ConsumerGroupInfo { return updated; } - public boolean updateSubscription(final Set<SubscriptionData> subList) { boolean updated = false; @@ -166,15 +155,15 @@ public class ConsumerGroupInfo { if (null == prev) { updated = true; log.info("subscription changed, add new topic, group: {} {}", - this.groupName, - sub.toString()); + this.groupName, + sub.toString()); } } else if (sub.getSubVersion() > old.getSubVersion()) { if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) { log.info("subscription changed, group: {} OLD: {} NEW: {}", - this.groupName, - old.toString(), - sub.toString() + this.groupName, + old.toString(), + sub.toString() ); } @@ -182,7 +171,6 @@ public class ConsumerGroupInfo { } } - Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, SubscriptionData> next = it.next(); @@ -198,9 +186,9 @@ public class ConsumerGroupInfo { if (!exist) { log.warn("subscription changed, group: {} remove topic {} {}", - this.groupName, - oldTopic, - next.getValue().toString() + this.groupName, + oldTopic, + next.getValue().toString() ); it.remove(); @@ -213,57 +201,46 @@ public class ConsumerGroupInfo { return updated; } - public Set<String> getSubscribeTopics() { return subscriptionTable.keySet(); } - public SubscriptionData findSubscriptionData(final String topic) { return this.subscriptionTable.get(topic); } - public ConsumeType getConsumeType() { return consumeType; } - public void setConsumeType(ConsumeType consumeType) { this.consumeType = consumeType; } - public MessageModel getMessageModel() { return messageModel; } - public void setMessageModel(MessageModel messageModel) { this.messageModel = messageModel; } - public String getGroupName() { return groupName; } - public long getLastUpdateTimestamp() { return lastUpdateTimestamp; } - public void setLastUpdateTimestamp(long lastUpdateTimestamp) { this.lastUpdateTimestamp = lastUpdateTimestamp; } - public ConsumeFromWhere getConsumeFromWhere() { return consumeFromWhere; } - public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { this.consumeFromWhere = consumeFromWhere; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java index 368582a..fbec010 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java @@ -6,21 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.client; import io.netty.channel.Channel; - import java.util.List; - public interface ConsumerIdsChangeListener { void consumerIdsChanged(final String group, final List<Channel> channels); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index 95ed478..fd4fb88 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -16,6 +16,12 @@ */ package org.apache.rocketmq.broker.client; +import io.netty.channel.Channel; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -23,22 +29,14 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; -import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - - public class ConsumerManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable = - new ConcurrentHashMap<String, ConsumerGroupInfo>(1024); + new ConcurrentHashMap<String, ConsumerGroupInfo>(1024); private final ConsumerIdsChangeListener consumerIdsChangeListener; public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) { @@ -86,7 +84,7 @@ public class ConsumerManager { ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey()); if (remove != null) { log.info("unregister consumer ok, no any connection, and remove consumer group, {}", - next.getKey()); + next.getKey()); } } @@ -96,8 +94,8 @@ public class ConsumerManager { } public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, - ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, - final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { + ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, + final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) { @@ -107,8 +105,8 @@ public class ConsumerManager { } boolean r1 = - consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, - consumeFromWhere); + consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, + consumeFromWhere); boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { @@ -143,7 +141,7 @@ public class ConsumerManager { String group = next.getKey(); ConsumerGroupInfo consumerGroupInfo = next.getValue(); ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = - consumerGroupInfo.getChannelInfoTable(); + consumerGroupInfo.getChannelInfoTable(); Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator(); while (itChannel.hasNext()) { @@ -152,8 +150,8 @@ public class ConsumerManager { long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp(); if (diff > CHANNEL_EXPIRED_TIMEOUT) { log.warn( - "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}", - RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group); + "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}", + RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group); RemotingUtil.closeChannel(clientChannelInfo.getChannel()); itChannel.remove(); } @@ -161,8 +159,8 @@ public class ConsumerManager { if (channelInfoTable.isEmpty()) { log.warn( - "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}", - group); + "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}", + group); it.remove(); } } @@ -174,7 +172,7 @@ public class ConsumerManager { while (it.hasNext()) { Entry<String, ConsumerGroupInfo> entry = it.next(); ConcurrentHashMap<String, SubscriptionData> subscriptionTable = - entry.getValue().getSubscriptionTable(); + entry.getValue().getSubscriptionTable(); if (subscriptionTable.containsKey(topic)) { groups.add(entry.getKey()); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java index b60fcb3..93f73b8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java @@ -6,31 +6,27 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.client; -import org.apache.rocketmq.broker.BrokerController; import io.netty.channel.Channel; - import java.util.List; - +import org.apache.rocketmq.broker.BrokerController; public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener { private final BrokerController brokerController; - public DefaultConsumerIdsChangeListener(BrokerController brokerController) { this.brokerController = brokerController; } - @Override public void consumerIdsChanged(String group, List<Channel> channels) { if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {