http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index 0f9954b..3047faf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -6,23 +6,17 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.client; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.common.RemotingUtil; import io.netty.channel.Channel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -30,7 +24,11 @@ import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ProducerManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -38,16 +36,14 @@ public class ProducerManager { private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; private final Lock groupChannelLock = new ReentrantLock(); private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable = - new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); - + new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); public ProducerManager() { } - public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() { HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable = - new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); + new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); try { if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { @@ -62,13 +58,12 @@ public class ProducerManager { return newGroupChannelTable; } - public void scanNotActiveChannel() { try { if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable - .entrySet()) { + .entrySet()) { final String group = entry.getKey(); final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue(); @@ -82,8 +77,8 @@ public class ProducerManager { if (diff > CHANNEL_EXPIRED_TIMEOUT) { it.remove(); log.warn( - "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", - RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); + "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", + RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); RemotingUtil.closeChannel(info.getChannel()); } } @@ -99,23 +94,22 @@ public class ProducerManager { } } - public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { if (channel != null) { try { if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable - .entrySet()) { + .entrySet()) { final String group = entry.getKey(); final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable = - entry.getValue(); + entry.getValue(); final ClientChannelInfo clientChannelInfo = - clientChannelInfoTable.remove(channel); + clientChannelInfoTable.remove(channel); if (clientChannelInfo != null) { log.info( - "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", - clientChannelInfo.toString(), remoteAddr, group); + "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", + clientChannelInfo.toString(), remoteAddr, group); } } @@ -131,7 +125,6 @@ public class ProducerManager { } } - public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { try { ClientChannelInfo clientChannelInfoFound = null; @@ -148,7 +141,7 @@ public class ProducerManager { if (null == clientChannelInfoFound) { channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); log.info("new producer connected, group: {} channel: {}", group, - clientChannelInfo.toString()); + clientChannelInfo.toString()); } } finally { this.groupChannelLock.unlock(); @@ -165,7 +158,6 @@ public class ProducerManager { } } - public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) { try { if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { @@ -175,7 +167,7 @@ public class ProducerManager { ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); if (old != null) { log.info("unregister a producer[{}] from groupChannelTable {}", group, - clientChannelInfo.toString()); + clientChannelInfo.toString()); } if (channelTable.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index 40eff81..f6d7955 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -16,6 +16,16 @@ */ package org.apache.rocketmq.broker.client.net; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.FileRegion; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -40,21 +50,9 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.SelectMappedBufferResult; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.FileRegion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - - public class Broker2Client { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -64,17 +62,17 @@ public class Broker2Client { } public void checkProducerTransactionState( - final Channel channel, - final CheckTransactionStateRequestHeader requestHeader, - final SelectMappedBufferResult selectMappedBufferResult) { + final Channel channel, + final CheckTransactionStateRequestHeader requestHeader, + final SelectMappedBufferResult selectMappedBufferResult) { RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); + RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); request.markOnewayRPC(); try { FileRegion fileRegion = - new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()), - selectMappedBufferResult); + new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()), + selectMappedBufferResult); channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -91,14 +89,14 @@ public class Broker2Client { } public RemotingCommand callClient(final Channel channel, - final RemotingCommand request + final RemotingCommand request ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000); } public void notifyConsumerIdsChanged( - final Channel channel, - final String consumerGroup) { + final Channel channel, + final String consumerGroup) { if (null == consumerGroup) { log.error("notifyConsumerIdsChanged consumerGroup is null"); return; @@ -107,7 +105,7 @@ public class Broker2Client { NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); + RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); try { this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); @@ -121,7 +119,7 @@ public class Broker2Client { } public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce, - boolean isC) { + boolean isC) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); @@ -141,7 +139,7 @@ public class Broker2Client { mq.setQueueId(i); long consumerOffset = - this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i); + this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i); if (-1 == consumerOffset) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("THe consumer group <%s> not exist", group)); @@ -173,7 +171,7 @@ public class Broker2Client { requestHeader.setGroup(group); requestHeader.setTimestamp(timeStamp); RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader); + RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader); if (isC) { // c++ language ResetOffsetBodyForC body = new ResetOffsetBodyForC(); @@ -188,37 +186,37 @@ public class Broker2Client { } ConsumerGroupInfo consumerGroupInfo = - this.brokerController.getConsumerManager().getConsumerGroupInfo(group); + this.brokerController.getConsumerManager().getConsumerGroupInfo(group); if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) { ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = - consumerGroupInfo.getChannelInfoTable(); + consumerGroupInfo.getChannelInfoTable(); for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { int version = entry.getValue().getVersion(); if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { try { this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000); log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", - topic, group, entry.getValue().getClientId()); + topic, group, entry.getValue().getClientId()); } catch (Exception e) { log.error("[reset-offset] reset offset exception. topic={}, group={}", - new Object[]{topic, group}, e); + new Object[] {topic, group}, e); } } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("the client does not support this feature. version=" - + MQVersion.getVersionDesc(version)); + + MQVersion.getVersionDesc(version)); log.warn("[reset-offset] the client does not support this feature. version={}", - RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); + RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); return response; } } } else { String errorInfo = - String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d", - requestHeader.getGroup(), - requestHeader.getTopic(), - requestHeader.getTimestamp()); + String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d", + requestHeader.getGroup(), + requestHeader.getTopic(), + requestHeader.getTimestamp()); log.error(errorInfo); response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); response.setRemark(errorInfo); @@ -236,7 +234,7 @@ public class Broker2Client { for (Entry<MessageQueue, Long> entry : table.entrySet()) { MessageQueue mq = entry.getKey(); MessageQueueForC tmp = - new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue()); + new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue()); list.add(tmp); } return list; @@ -249,13 +247,13 @@ public class Broker2Client { requestHeader.setTopic(topic); requestHeader.setGroup(group); RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, - requestHeader); + RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, + requestHeader); Map<String, Map<MessageQueue, Long>> consumerStatusTable = - new HashMap<String, Map<MessageQueue, Long>>(); + new HashMap<String, Map<MessageQueue, Long>>(); ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = - this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable(); + this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable(); if (null == channelInfoTable || channelInfoTable.isEmpty()) { result.setCode(ResponseCode.SYSTEM_ERROR); result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group)); @@ -268,26 +266,26 @@ public class Broker2Client { if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { result.setCode(ResponseCode.SYSTEM_ERROR); result.setRemark("the client does not support this feature. version=" - + MQVersion.getVersionDesc(version)); + + MQVersion.getVersionDesc(version)); log.warn("[get-consumer-status] the client does not support this feature. version={}", - RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); + RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); return result; } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) { try { RemotingCommand response = - this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000); + this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { if (response.getBody() != null) { GetConsumerStatusBody body = - GetConsumerStatusBody.decode(response.getBody(), - GetConsumerStatusBody.class); + GetConsumerStatusBody.decode(response.getBody(), + GetConsumerStatusBody.class); consumerStatusTable.put(clientId, body.getMessageQueueTable()); log.info( - "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}", - topic, group, clientId); + "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}", + topic, group, clientId); } } default: @@ -295,8 +293,8 @@ public class Broker2Client { } } catch (Exception e) { log.error( - "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}", - new Object[]{topic, group}, e); + "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}", + new Object[] {topic, group}, e); } if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java index 82ca014..88c044a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java @@ -16,25 +16,23 @@ */ package org.apache.rocketmq.broker.client.rebalance; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.message.MessageQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RebalanceLockManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME); private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( - "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); + "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); private final Lock lock = new ReentrantLock(); private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = - new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024); + new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024); public boolean tryLock(final String group, final MessageQueue mq, final String clientId) { @@ -54,9 +52,9 @@ public class RebalanceLockManager { lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", // - group, // - clientId, // - mq); + group, // + clientId, // + mq); } if (lockEntry.isLocked(clientId)) { @@ -66,26 +64,24 @@ public class RebalanceLockManager { String oldClientId = lockEntry.getClientId(); - if (lockEntry.isExpired()) { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( - "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // - mq); - return true; - } - - - log.warn( - "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // + "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // group, // oldClientId, // clientId, // mq); + return true; + } + + log.warn( + "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // + group, // + oldClientId, // + clientId, // + mq); return false; } finally { this.lock.unlock(); @@ -118,11 +114,10 @@ public class RebalanceLockManager { } public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs, - final String clientId) { + final String clientId) { Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size()); Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size()); - for (MessageQueue mq : mqs) { if (this.isLocked(group, mq, clientId)) { lockedMqs.add(mq); @@ -141,7 +136,6 @@ public class RebalanceLockManager { this.mqLockTable.put(group, groupValue); } - for (MessageQueue mq : notLockedMqs) { LockEntry lockEntry = groupValue.get(mq); if (null == lockEntry) { @@ -149,13 +143,12 @@ public class RebalanceLockManager { lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); log.info( - "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", // - group, // - clientId, // - mq); + "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", // + group, // + clientId, // + mq); } - if (lockEntry.isLocked(clientId)) { lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); lockedMqs.add(mq); @@ -164,27 +157,25 @@ public class RebalanceLockManager { String oldClientId = lockEntry.getClientId(); - if (lockEntry.isExpired()) { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( - "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // - mq); + "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // + group, // + oldClientId, // + clientId, // + mq); lockedMqs.add(mq); continue; } - log.warn( - "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // - mq); + "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // + group, // + oldClientId, // + clientId, // + mq); } } finally { this.lock.unlock(); @@ -209,27 +200,27 @@ public class RebalanceLockManager { if (lockEntry.getClientId().equals(clientId)) { groupValue.remove(mq); log.info("unlockBatch, Group: {} {} {}", - group, - mq, - clientId); + group, + mq, + clientId); } else { log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}", - lockEntry.getClientId(), - group, - mq, - clientId); - } - } else { - log.warn("unlockBatch, but mq not locked, Group: {} {} {}", + lockEntry.getClientId(), group, mq, clientId); + } + } else { + log.warn("unlockBatch, but mq not locked, Group: {} {} {}", + group, + mq, + clientId); } } } else { log.warn("unlockBatch, group not exist, Group: {} {}", - group, - clientId); + group, + clientId); } } finally { this.lock.unlock(); @@ -243,22 +234,18 @@ public class RebalanceLockManager { private String clientId; private volatile long lastUpdateTimestamp = System.currentTimeMillis(); - public String getClientId() { return clientId; } - public void setClientId(String clientId) { this.clientId = clientId; } - public long getLastUpdateTimestamp() { return lastUpdateTimestamp; } - public void setLastUpdateTimestamp(long lastUpdateTimestamp) { this.lastUpdateTimestamp = lastUpdateTimestamp; } @@ -270,7 +257,7 @@ public class RebalanceLockManager { public boolean isExpired() { boolean expired = - (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; + (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; return expired; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java index 5b86d99..c1b67ae 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java @@ -17,15 +17,7 @@ package org.apache.rocketmq.broker.filtersrv; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.BrokerStartup; -import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.remoting.common.RemotingUtil; import io.netty.channel.Channel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -34,18 +26,24 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.BrokerStartup; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FilterServerManager { public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000; private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable = - new ConcurrentHashMap<Channel, FilterServerInfo>(16); + new ConcurrentHashMap<Channel, FilterServerInfo>(16); private final BrokerController brokerController; private ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread")); + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread")); public FilterServerManager(final BrokerController brokerController) { this.brokerController = brokerController; @@ -67,7 +65,7 @@ public class FilterServerManager { public void createFilterServer() { int more = - this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size(); + this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size(); String cmd = this.buildStartCommand(); for (int i = 0; i < more; i++) { FilterServerUtil.callShell(cmd, log); @@ -86,12 +84,12 @@ public class FilterServerManager { if (RemotingUtil.isWindowsPlatform()) { return String.format("start /b %s\\bin\\mqfiltersrv.exe %s", - this.brokerController.getBrokerConfig().getRocketmqHome(), - config); + this.brokerController.getBrokerConfig().getRocketmqHome(), + config); } else { return String.format("sh %s/bin/startfsrv.sh %s", - this.brokerController.getBrokerConfig().getRocketmqHome(), - config); + this.brokerController.getBrokerConfig().getRocketmqHome(), + config); } } @@ -134,7 +132,7 @@ public class FilterServerManager { FilterServerInfo old = this.filterServerTable.remove(channel); if (old != null) { log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(), - remoteAddr); + remoteAddr); } } @@ -152,22 +150,18 @@ public class FilterServerManager { private String filterServerAddr; private long lastUpdateTimestamp; - public String getFilterServerAddr() { return filterServerAddr; } - public void setFilterServerAddr(String filterServerAddr) { this.filterServerAddr = filterServerAddr; } - public long getLastUpdateTimestamp() { return lastUpdateTimestamp; } - public void setLastUpdateTimestamp(long lastUpdateTimestamp) { this.lastUpdateTimestamp = lastUpdateTimestamp; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java index de4cc37..9edfbe4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java @@ -6,20 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.filtersrv; import org.slf4j.Logger; - public class FilterServerUtil { public static void callShell(final String shellString, final Logger log) { Process process = null; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index 5359368..f616e33 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -6,16 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.latency; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; @@ -24,21 +27,27 @@ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - public class BrokerFastFailure { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "BrokerFastFailureScheduledThread")); + "BrokerFastFailureScheduledThread")); private final BrokerController brokerController; public BrokerFastFailure(final BrokerController brokerController) { this.brokerController = brokerController; } + public static RequestTask castRunnable(final Runnable runnable) { + try { + FutureTaskExt object = (FutureTaskExt)runnable; + return (RequestTask)object.getRunnable(); + } catch (Throwable e) { + log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e); + } + + return null; + } + public void start() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override @@ -95,17 +104,6 @@ public class BrokerFastFailure { } } - public static RequestTask castRunnable(final Runnable runnable) { - try { - FutureTaskExt object = (FutureTaskExt) runnable; - return (RequestTask) object.getRunnable(); - } catch (Throwable e) { - log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e); - } - - return null; - } - public void shutdown() { this.scheduledExecutorService.shutdown(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java index 8c4c5e8..5a4ad2d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java @@ -17,22 +17,31 @@ package org.apache.rocketmq.broker.latency; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor { - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue) { + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, + final BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) { + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, + final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) { + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, + final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) { + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, + final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java index e261b40..0ee02ad 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java @@ -6,37 +6,33 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.longpolling; import java.util.ArrayList; import java.util.List; - public class ManyPullRequest { private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>(); - public synchronized void addPullRequest(final PullRequest pullRequest) { this.pullRequestList.add(pullRequest); } - public synchronized void addPullRequest(final List<PullRequest> many) { this.pullRequestList.addAll(many); } - public synchronized List<PullRequest> cloneListAndClear() { if (!this.pullRequestList.isEmpty()) { - List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone(); + List<PullRequest> result = (ArrayList<PullRequest>)this.pullRequestList.clone(); this.pullRequestList.clear(); return result; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java index f953c1e..97e54f9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java @@ -6,29 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.longpolling; import org.apache.rocketmq.store.MessageArrivingListener; - public class NotifyMessageArrivingListener implements MessageArrivingListener { private final PullRequestHoldService pullRequestHoldService; - public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) { this.pullRequestHoldService = pullRequestHoldService; } - @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode) { this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java index 40716f8..1cceec8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java @@ -6,20 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.longpolling; +import io.netty.channel.Channel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.Channel; - public class PullRequest { private final RemotingCommand requestCommand; @@ -29,9 +28,8 @@ public class PullRequest { private final long pullFromThisOffset; private final SubscriptionData subscriptionData; - public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp, - long pullFromThisOffset, SubscriptionData subscriptionData) { + long pullFromThisOffset, SubscriptionData subscriptionData) { this.requestCommand = requestCommand; this.clientChannel = clientChannel; this.timeoutMillis = timeoutMillis; @@ -40,27 +38,22 @@ public class PullRequest { this.subscriptionData = subscriptionData; } - public RemotingCommand getRequestCommand() { return requestCommand; } - public Channel getClientChannel() { return clientChannel; } - public long getTimeoutMillis() { return timeoutMillis; } - public long getSuspendTimestamp() { return suspendTimestamp; } - public long getPullFromThisOffset() { return pullFromThisOffset; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index 5182664..0e5be9b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -16,6 +16,9 @@ */ package org.apache.rocketmq.broker.longpolling; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.SystemClock; @@ -25,11 +28,6 @@ import org.apache.rocketmq.store.MessageFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - - public class PullRequestHoldService extends ServiceThread { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final String TOPIC_QUEUEID_SEPARATOR = "@"; @@ -37,8 +35,7 @@ public class PullRequestHoldService extends ServiceThread { private final SystemClock systemClock = new SystemClock(); private final MessageFilter messageFilter = new DefaultMessageFilter(); private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = - new ConcurrentHashMap<String, ManyPullRequest>(1024); - + new ConcurrentHashMap<String, ManyPullRequest>(1024); public PullRequestHoldService(final BrokerController brokerController) { this.brokerController = brokerController; @@ -135,7 +132,7 @@ public class PullRequestHoldService extends ServiceThread { if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) { try { this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), - request.getRequestCommand()); + request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } @@ -146,14 +143,13 @@ public class PullRequestHoldService extends ServiceThread { if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), - request.getRequestCommand()); + request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } - replayList.add(request); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java index 3a167fa..cc2f218 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java @@ -6,20 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.mqtrace; -import org.apache.rocketmq.store.stats.BrokerStatsManager; - import java.util.Map; - +import org.apache.rocketmq.store.stats.BrokerStatsManager; public class ConsumeMessageContext { private String consumerGroup; @@ -38,102 +36,82 @@ public class ConsumeMessageContext { private int commercialRcvTimes; private int commercialRcvSize; - public String getConsumerGroup() { return consumerGroup; } - public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } - public String getTopic() { return topic; } - public void setTopic(String topic) { this.topic = topic; } - public Integer getQueueId() { return queueId; } - public void setQueueId(Integer queueId) { this.queueId = queueId; } - public String getClientHost() { return clientHost; } - public void setClientHost(String clientHost) { this.clientHost = clientHost; } - public String getStoreHost() { return storeHost; } - public void setStoreHost(String storeHost) { this.storeHost = storeHost; } - public Map<String, Long> getMessageIds() { return messageIds; } - public void setMessageIds(Map<String, Long> messageIds) { this.messageIds = messageIds; } - public boolean isSuccess() { return success; } - public void setSuccess(boolean success) { this.success = success; } - public String getStatus() { return status; } - public void setStatus(String status) { this.status = status; } - public Object getMqTraceContext() { return mqTraceContext; } - public void setMqTraceContext(Object mqTraceContext) { this.mqTraceContext = mqTraceContext; } - public int getBodyLength() { return bodyLength; } - public void setBodyLength(int bodyLength) { this.bodyLength = bodyLength; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java index c4b7f36..7e724a0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java @@ -6,22 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.mqtrace; public interface ConsumeMessageHook { String hookName(); - void consumeMessageBefore(final ConsumeMessageContext context); - void consumeMessageAfter(final ConsumeMessageContext context); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java index ca8121d..8ad7919 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java @@ -6,22 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.mqtrace; +import java.util.Properties; import org.apache.rocketmq.common.message.MessageType; import org.apache.rocketmq.store.stats.BrokerStatsManager; -import java.util.Properties; - - public class SendMessageContext { private String producerGroup; private String topic; @@ -92,137 +90,110 @@ public class SendMessageContext { return producerGroup; } - public void setProducerGroup(String producerGroup) { this.producerGroup = producerGroup; } - public String getTopic() { return topic; } - public void setTopic(String topic) { this.topic = topic; } - public String getMsgId() { return msgId; } - public void setMsgId(String msgId) { this.msgId = msgId; } - public String getOriginMsgId() { return originMsgId; } - public void setOriginMsgId(String originMsgId) { this.originMsgId = originMsgId; } - public Integer getQueueId() { return queueId; } - public void setQueueId(Integer queueId) { this.queueId = queueId; } - public Long getQueueOffset() { return queueOffset; } - public void setQueueOffset(Long queueOffset) { this.queueOffset = queueOffset; } - public String getBrokerAddr() { return brokerAddr; } - public void setBrokerAddr(String brokerAddr) { this.brokerAddr = brokerAddr; } - public String getBornHost() { return bornHost; } - public void setBornHost(String bornHost) { this.bornHost = bornHost; } - public int getBodyLength() { return bodyLength; } - public void setBodyLength(int bodyLength) { this.bodyLength = bodyLength; } - public int getCode() { return code; } - public void setCode(int code) { this.code = code; } - public String getErrorMsg() { return errorMsg; } - public void setErrorMsg(String errorMsg) { this.errorMsg = errorMsg; } - public String getMsgProps() { return msgProps; } - public void setMsgProps(String msgProps) { this.msgProps = msgProps; } - public Object getMqTraceContext() { return mqTraceContext; } - public void setMqTraceContext(Object mqTraceContext) { this.mqTraceContext = mqTraceContext; } - public Properties getExtProps() { return extProps; } - public void setExtProps(Properties extProps) { this.extProps = extProps; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java index 84cbdcb..a84d899 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java @@ -6,22 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.mqtrace; public interface SendMessageHook { public String hookName(); - public void sendMessageBefore(final SendMessageContext context); - public void sendMessageAfter(final SendMessageContext context); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 06ceb36..9346067 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -6,16 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.offset; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -25,30 +32,22 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - - public class ConsumerOffsetManager extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final String TOPIC_GROUP_SEPARATOR = "@"; private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = - new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); + new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); private transient BrokerController brokerController; - public ConsumerOffsetManager() { } - public ConsumerOffsetManager(BrokerController brokerController) { this.brokerController = brokerController; } - public void scanUnsubscribedTopic() { Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { @@ -60,7 +59,7 @@ public class ConsumerOffsetManager extends ConfigManager { String group = arrays[1]; if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic) - && this.offsetBehindMuchThanData(topic, next.getValue())) { + && this.offsetBehindMuchThanData(topic, next.getValue())) { it.remove(); log.warn("remove topic offset, {}", topicAtGroup); } @@ -68,7 +67,6 @@ public class ConsumerOffsetManager extends ConfigManager { } } - private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) { Iterator<Entry<Integer, Long>> it = table.entrySet().iterator(); boolean result = !table.isEmpty(); @@ -83,7 +81,6 @@ public class ConsumerOffsetManager extends ConfigManager { return result; } - public Set<String> whichTopicByConsumer(final String group) { Set<String> topics = new HashSet<String>(); @@ -102,7 +99,6 @@ public class ConsumerOffsetManager extends ConfigManager { return topics; } - public Set<String> whichGroupByTopic(final String topic) { Set<String> groups = new HashSet<String>(); @@ -121,7 +117,6 @@ public class ConsumerOffsetManager extends ConfigManager { return groups; } - public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; @@ -182,12 +177,10 @@ public class ConsumerOffsetManager extends ConfigManager { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) { this.offsetTable = offsetTable; } - public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) { Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>(); @@ -224,14 +217,12 @@ public class ConsumerOffsetManager extends ConfigManager { return queueMinOffset; } - public Map<Integer, Long> queryOffset(final String group, final String topic) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; return this.offsetTable.get(key); } - public void cloneOffset(final String srcGroup, final String destGroup, final String topic) { ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup); if (offsets != null) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 3d969c4..25b333a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -16,6 +16,9 @@ */ package org.apache.rocketmq.broker.out; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -23,24 +26,27 @@ import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.*; +import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.remoting.exception.*; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.List; - - public class BrokerOuterAPI { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final RemotingClient remotingClient; @@ -92,15 +98,15 @@ public class BrokerOuterAPI { } public RegisterBrokerResult registerBrokerAll( - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId, - final String haServerAddr, - final TopicConfigSerializeWrapper topicConfigWrapper, - final List<String> filterServerList, - final boolean oneway, - final int timeoutMills) { + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId, + final String haServerAddr, + final TopicConfigSerializeWrapper topicConfigWrapper, + final List<String> filterServerList, + final boolean oneway, + final int timeoutMills) { RegisterBrokerResult registerBrokerResult = null; List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); @@ -108,7 +114,7 @@ public class BrokerOuterAPI { for (String namesrvAddr : nameServerAddressList) { try { RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, - haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills); + haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills); if (result != null) { registerBrokerResult = result; } @@ -124,18 +130,18 @@ public class BrokerOuterAPI { } private RegisterBrokerResult registerBroker( - final String namesrvAddr, - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId, - final String haServerAddr, - final TopicConfigSerializeWrapper topicConfigWrapper, - final List<String> filterServerList, - final boolean oneway, - final int timeoutMills + final String namesrvAddr, + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId, + final String haServerAddr, + final TopicConfigSerializeWrapper topicConfigWrapper, + final List<String> filterServerList, + final boolean oneway, + final int timeoutMills ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException { + InterruptedException { RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); @@ -163,7 +169,7 @@ public class BrokerOuterAPI { switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterBrokerResponseHeader responseHeader = - (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); + (RegisterBrokerResponseHeader)response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); RegisterBrokerResult result = new RegisterBrokerResult(); result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); @@ -181,10 +187,10 @@ public class BrokerOuterAPI { } public void unregisterBrokerAll( - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId ) { List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null) { @@ -200,11 +206,11 @@ public class BrokerOuterAPI { } public void unregisterBroker( - final String namesrvAddr, - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId + final String namesrvAddr, + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); @@ -227,7 +233,7 @@ public class BrokerOuterAPI { } public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, InterruptedException, MQBrokerException { + RemotingTimeoutException, InterruptedException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000); @@ -244,7 +250,7 @@ public class BrokerOuterAPI { } public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); assert response != null; @@ -260,7 +266,7 @@ public class BrokerOuterAPI { } public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, - RemotingConnectException, MQBrokerException, UnsupportedEncodingException { + RemotingConnectException, MQBrokerException, UnsupportedEncodingException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null); RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); assert response != null; @@ -276,7 +282,7 @@ public class BrokerOuterAPI { } public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); assert response != null; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java index e4c3045..1b1d5bb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java @@ -6,38 +6,34 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.pagecache; -import org.apache.rocketmq.store.GetMessageResult; import io.netty.channel.FileRegion; import io.netty.util.AbstractReferenceCounted; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.util.List; - +import org.apache.rocketmq.store.GetMessageResult; public class ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion { private final ByteBuffer byteBufferHeader; private final GetMessageResult getMessageResult; private long transfered; // the bytes which was transfered already - public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) { this.byteBufferHeader = byteBufferHeader; this.getMessageResult = getMessageResult; } - @Override public long position() { int pos = byteBufferHeader.position(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java index 3f00ece..b4cf0da 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java @@ -6,37 +6,33 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.pagecache; -import org.apache.rocketmq.store.SelectMappedBufferResult; import io.netty.channel.FileRegion; import io.netty.util.AbstractReferenceCounted; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; - +import org.apache.rocketmq.store.SelectMappedBufferResult; public class OneMessageTransfer extends AbstractReferenceCounted implements FileRegion { private final ByteBuffer byteBufferHeader; private final SelectMappedBufferResult selectMappedBufferResult; private long transfered; // the bytes which was transfered already - public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) { this.byteBufferHeader = byteBufferHeader; this.selectMappedBufferResult = selectMappedBufferResult; } - @Override public long position() { return this.byteBufferHeader.position() + this.selectMappedBufferResult.getByteBuffer().position();