http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java deleted file mode 100644 index 19016ca..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl; - -import com.alibaba.rocketmq.client.ClientConfig; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.remoting.RPCHook; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - - -/** - * @author shijia.wxr - */ -public class MQClientManager { - private static MQClientManager instance = new MQClientManager(); - private AtomicInteger factoryIndexGenerator = new AtomicInteger(); - private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable = - new ConcurrentHashMap<String, MQClientInstance>(); - - - private MQClientManager() { - - } - - - public static MQClientManager getInstance() { - return instance; - } - - public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) { - return getAndCreateMQClientInstance(clientConfig, null); - } - - public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { - String clientId = clientConfig.buildMQClientId(); - MQClientInstance instance = this.factoryTable.get(clientId); - if (null == instance) { - instance = - new MQClientInstance(clientConfig.cloneClientConfig(), - this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); - MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); - if (prev != null) { - instance = prev; - } else { - // TODO log - } - } - - return instance; - } - - public void removeClientFactory(final String clientId) { - this.factoryTable.remove(clientId); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java deleted file mode 100644 index 4dee764..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ /dev/null @@ -1,471 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeReturnType; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import com.alibaba.rocketmq.client.hook.ConsumeMessageContext; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.client.stat.ConsumerStatsManager; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.ThreadFactoryImpl; -import com.alibaba.rocketmq.common.message.MessageAccessor; -import com.alibaba.rocketmq.common.message.MessageConst; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.body.CMResult; -import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import org.slf4j.Logger; - -import java.util.*; -import java.util.concurrent.*; - - -/** - * @author shijia.wxr - */ -public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { - private static final Logger log = ClientLogger.getLog(); - private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; - private final DefaultMQPushConsumer defaultMQPushConsumer; - private final MessageListenerConcurrently messageListener; - private final BlockingQueue<Runnable> consumeRequestQueue; - private final ThreadPoolExecutor consumeExecutor; - private final String consumerGroup; - - private final ScheduledExecutorService scheduledExecutorService; - private final ScheduledExecutorService cleanExpireMsgExecutors; - - - public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, - MessageListenerConcurrently messageListener) { - this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; - this.messageListener = messageListener; - - this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); - this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); - this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); - - this.consumeExecutor = new ThreadPoolExecutor(// - this.defaultMQPushConsumer.getConsumeThreadMin(), // - this.defaultMQPushConsumer.getConsumeThreadMax(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.consumeRequestQueue, // - new ThreadFactoryImpl("ConsumeMessageThread_")); - - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); - this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); - } - - - public void start() { - this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - cleanExpireMsg(); - } - - }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); - } - - - public void shutdown() { - this.scheduledExecutorService.shutdown(); - this.consumeExecutor.shutdown(); - this.cleanExpireMsgExecutors.shutdown(); - } - - @Override - public void updateCorePoolSize(int corePoolSize) { - if (corePoolSize > 0 // - && corePoolSize <= Short.MAX_VALUE // - && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { - this.consumeExecutor.setCorePoolSize(corePoolSize); - } - } - - @Override - public void incCorePoolSize() { - // long corePoolSize = this.consumeExecutor.getCorePoolSize(); - // if (corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) - // { - // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize() - // + 1); - // } - // - // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup: - // {}", // - // corePoolSize,// - // this.consumeExecutor.getCorePoolSize(),// - // this.consumerGroup); - } - - @Override - public void decCorePoolSize() { - // long corePoolSize = this.consumeExecutor.getCorePoolSize(); - // if (corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin()) - // { - // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize() - // - 1); - // } - // - // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup: - // {}", // - // corePoolSize,// - // this.consumeExecutor.getCorePoolSize(),// - // this.consumerGroup); - } - - @Override - public int getCorePoolSize() { - return this.consumeExecutor.getCorePoolSize(); - } - - @Override - public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) { - ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult(); - result.setOrder(false); - result.setAutoCommit(true); - - List<MessageExt> msgs = new ArrayList<MessageExt>(); - msgs.add(msg); - MessageQueue mq = new MessageQueue(); - mq.setBrokerName(brokerName); - mq.setTopic(msg.getTopic()); - mq.setQueueId(msg.getQueueId()); - - ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq); - - this.resetRetryTopic(msgs); - - final long beginTime = System.currentTimeMillis(); - - log.info("consumeMessageDirectly receive new messge: {}", msg); - - try { - ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context); - if (status != null) { - switch (status) { - case CONSUME_SUCCESS: - result.setConsumeResult(CMResult.CR_SUCCESS); - break; - case RECONSUME_LATER: - result.setConsumeResult(CMResult.CR_LATER); - break; - default: - break; - } - } else { - result.setConsumeResult(CMResult.CR_RETURN_NULL); - } - } catch (Throwable e) { - result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); - result.setRemark(RemotingHelper.exceptionSimpleDesc(e)); - - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageConcurrentlyService.this.consumerGroup, // - msgs, // - mq), e); - } - - result.setSpentTimeMills(System.currentTimeMillis() - beginTime); - - log.info("consumeMessageDirectly Result: {}", result); - - return result; - } - - @Override - public void submitConsumeRequest(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // - final boolean dispatchToConsume) { - final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); - if (msgs.size() <= consumeBatchSize) { - ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); - try { - this.consumeExecutor.submit(consumeRequest); - } catch (RejectedExecutionException e) { - this.submitConsumeRequestLater(consumeRequest); - } - } else { - for (int total = 0; total < msgs.size(); ) { - List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); - for (int i = 0; i < consumeBatchSize; i++, total++) { - if (total < msgs.size()) { - msgThis.add(msgs.get(total)); - } else { - break; - } - } - - ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); - try { - this.consumeExecutor.submit(consumeRequest); - } catch (RejectedExecutionException e) { - for (; total < msgs.size(); total++) { - msgThis.add(msgs.get(total)); - } - - this.submitConsumeRequestLater(consumeRequest); - } - } - } - } - - public void resetRetryTopic(final List<MessageExt> msgs) { - final String groupTopic = MixAll.getRetryTopic(consumerGroup); - for (MessageExt msg : msgs) { - String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); - if (retryTopic != null && groupTopic.equals(msg.getTopic())) { - msg.setTopic(retryTopic); - } - } - } - - private void cleanExpireMsg() { - Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = - this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); - while (it.hasNext()) { - Map.Entry<MessageQueue, ProcessQueue> next = it.next(); - ProcessQueue pq = next.getValue(); - pq.cleanExpiredMsg(this.defaultMQPushConsumer); - } - } - - public void processConsumeResult(// - final ConsumeConcurrentlyStatus status, // - final ConsumeConcurrentlyContext context, // - final ConsumeRequest consumeRequest// - ) { - int ackIndex = context.getAckIndex(); - - if (consumeRequest.getMsgs().isEmpty()) - return; - - switch (status) { - case CONSUME_SUCCESS: - if (ackIndex >= consumeRequest.getMsgs().size()) { - ackIndex = consumeRequest.getMsgs().size() - 1; - } - int ok = ackIndex + 1; - int failed = consumeRequest.getMsgs().size() - ok; - this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); - this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); - break; - case RECONSUME_LATER: - ackIndex = -1; - this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), - consumeRequest.getMsgs().size()); - break; - default: - break; - } - - switch (this.defaultMQPushConsumer.getMessageModel()) { - case BROADCASTING: - for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { - MessageExt msg = consumeRequest.getMsgs().get(i); - log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); - } - break; - case CLUSTERING: - List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); - for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { - MessageExt msg = consumeRequest.getMsgs().get(i); - boolean result = this.sendMessageBack(msg, context); - if (!result) { - msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); - msgBackFailed.add(msg); - } - } - - if (!msgBackFailed.isEmpty()) { - consumeRequest.getMsgs().removeAll(msgBackFailed); - - this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); - } - break; - default: - break; - } - - long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); - if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { - this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); - } - } - - public ConsumerStatsManager getConsumerStatsManager() { - return this.defaultMQPushConsumerImpl.getConsumerStatsManager(); - } - - public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) { - int delayLevel = context.getDelayLevelWhenNextConsume(); - - try { - this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName()); - return true; - } catch (Exception e) { - log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); - } - - return false; - } - - private void submitConsumeRequestLater(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue// - ) { - - this.scheduledExecutorService.schedule(new Runnable() { - - @Override - public void run() { - ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true); - } - }, 5000, TimeUnit.MILLISECONDS); - } - - private void submitConsumeRequestLater(final ConsumeRequest consumeRequest// - ) { - - this.scheduledExecutorService.schedule(new Runnable() { - - @Override - public void run() { - ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest); - } - }, 5000, TimeUnit.MILLISECONDS); - } - - class ConsumeRequest implements Runnable { - private final List<MessageExt> msgs; - private final ProcessQueue processQueue; - private final MessageQueue messageQueue; - - - public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) { - this.msgs = msgs; - this.processQueue = processQueue; - this.messageQueue = messageQueue; - } - - public List<MessageExt> getMsgs() { - return msgs; - } - - public ProcessQueue getProcessQueue() { - return processQueue; - } - - @Override - public void run() { - if (this.processQueue.isDropped()) { - log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); - return; - } - - MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; - ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); - ConsumeConcurrentlyStatus status = null; - - ConsumeMessageContext consumeMessageContext = null; - if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { - consumeMessageContext = new ConsumeMessageContext(); - consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); - consumeMessageContext.setProps(new HashMap<String, String>()); - consumeMessageContext.setMq(messageQueue); - consumeMessageContext.setMsgList(msgs); - consumeMessageContext.setSuccess(false); - ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); - } - - long beginTimestamp = System.currentTimeMillis(); - boolean hasException = false; - ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; - try { - ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs); - if (msgs != null && !msgs.isEmpty()) { - for (MessageExt msg : msgs) { - MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); - } - } - status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); - } catch (Throwable e) { - log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageConcurrentlyService.this.consumerGroup, - msgs, - messageQueue); - hasException = true; - } - long consumeRT = System.currentTimeMillis() - beginTimestamp; - if (null == status) { - if (hasException) { - returnType = ConsumeReturnType.EXCEPTION; - } else { - returnType = ConsumeReturnType.RETURNNULL; - } - } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { - returnType = ConsumeReturnType.TIME_OUT; - } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { - returnType = ConsumeReturnType.FAILED; - } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { - returnType = ConsumeReturnType.SUCCESS; - } - consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); - if (null == status) { - log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", - ConsumeMessageConcurrentlyService.this.consumerGroup, - msgs, - messageQueue); - status = ConsumeConcurrentlyStatus.RECONSUME_LATER; - } - - if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { - consumeMessageContext.setStatus(status.toString()); - consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); - ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); - } - - ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() - .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); - - if (!processQueue.isDropped()) { - ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); - } else { - log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); - } - } - - public MessageQueue getMessageQueue() { - return messageQueue; - } - - - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java deleted file mode 100644 index 82903b0..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ /dev/null @@ -1,536 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; -import com.alibaba.rocketmq.client.consumer.listener.*; -import com.alibaba.rocketmq.client.hook.ConsumeMessageContext; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.client.stat.ConsumerStatsManager; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.ThreadFactoryImpl; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.message.*; -import com.alibaba.rocketmq.common.protocol.body.CMResult; -import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import org.slf4j.Logger; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.*; - - -/** - * @author shijia.wxr - */ -public class ConsumeMessageOrderlyService implements ConsumeMessageService { - private static final Logger log = ClientLogger.getLog(); - private final static long MAX_TIME_CONSUME_CONTINUOUSLY = - Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000")); - private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; - private final DefaultMQPushConsumer defaultMQPushConsumer; - private final MessageListenerOrderly messageListener; - private final BlockingQueue<Runnable> consumeRequestQueue; - private final ThreadPoolExecutor consumeExecutor; - private final String consumerGroup; - private final MessageQueueLock messageQueueLock = new MessageQueueLock(); - private final ScheduledExecutorService scheduledExecutorService; - private volatile boolean stopped = false; - - - public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { - this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; - this.messageListener = messageListener; - - this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); - this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); - this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); - - this.consumeExecutor = new ThreadPoolExecutor(// - this.defaultMQPushConsumer.getConsumeThreadMin(), // - this.defaultMQPushConsumer.getConsumeThreadMax(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.consumeRequestQueue, // - new ThreadFactoryImpl("ConsumeMessageThread_")); - - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); - } - - - public void start() { - if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - ConsumeMessageOrderlyService.this.lockMQPeriodically(); - } - }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); - } - } - - - public void shutdown() { - this.stopped = true; - this.scheduledExecutorService.shutdown(); - this.consumeExecutor.shutdown(); - if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { - this.unlockAllMQ(); - } - } - - - public synchronized void unlockAllMQ() { - this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false); - } - - @Override - public void updateCorePoolSize(int corePoolSize) { - if (corePoolSize > 0 // - && corePoolSize <= Short.MAX_VALUE // - && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { - this.consumeExecutor.setCorePoolSize(corePoolSize); - } - } - - @Override - public void incCorePoolSize() { - } - - @Override - public void decCorePoolSize() { - } - - @Override - public int getCorePoolSize() { - return this.consumeExecutor.getCorePoolSize(); - } - - @Override - public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) { - ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult(); - result.setOrder(true); - - List<MessageExt> msgs = new ArrayList<MessageExt>(); - msgs.add(msg); - MessageQueue mq = new MessageQueue(); - mq.setBrokerName(brokerName); - mq.setTopic(msg.getTopic()); - mq.setQueueId(msg.getQueueId()); - - ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq); - - final long beginTime = System.currentTimeMillis(); - - log.info("consumeMessageDirectly receive new messge: {}", msg); - - try { - ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context); - if (status != null) { - switch (status) { - case COMMIT: - result.setConsumeResult(CMResult.CR_COMMIT); - break; - case ROLLBACK: - result.setConsumeResult(CMResult.CR_ROLLBACK); - break; - case SUCCESS: - result.setConsumeResult(CMResult.CR_SUCCESS); - break; - case SUSPEND_CURRENT_QUEUE_A_MOMENT: - result.setConsumeResult(CMResult.CR_LATER); - break; - default: - break; - } - } else { - result.setConsumeResult(CMResult.CR_RETURN_NULL); - } - } catch (Throwable e) { - result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); - result.setRemark(RemotingHelper.exceptionSimpleDesc(e)); - - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // - mq), e); - } - - result.setAutoCommit(context.isAutoCommit()); - result.setSpentTimeMills(System.currentTimeMillis() - beginTime); - - log.info("consumeMessageDirectly Result: {}", result); - - return result; - } - - @Override - public void submitConsumeRequest(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // - final boolean dispathToConsume) { - if (dispathToConsume) { - ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); - this.consumeExecutor.submit(consumeRequest); - } - } - - public synchronized void lockMQPeriodically() { - if (!this.stopped) { - this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); - } - } - - public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue, final long delayMills) { - this.scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq); - if (lockOK) { - ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 10); - } else { - ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 3000); - } - } - }, delayMills, TimeUnit.MILLISECONDS); - } - - public synchronized boolean lockOneMQ(final MessageQueue mq) { - if (!this.stopped) { - return this.defaultMQPushConsumerImpl.getRebalanceImpl().lock(mq); - } - - return false; - } - - private void submitConsumeRequestLater(// - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // - final long suspendTimeMillis// - ) { - long timeMillis = suspendTimeMillis; - if (timeMillis == -1) { - timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis(); - } - - if (timeMillis < 10) { - timeMillis = 10; - } else if (timeMillis > 30000) { - timeMillis = 30000; - } - - this.scheduledExecutorService.schedule(new Runnable() { - - @Override - public void run() { - ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true); - } - }, timeMillis, TimeUnit.MILLISECONDS); - } - - public boolean processConsumeResult(// - final List<MessageExt> msgs, // - final ConsumeOrderlyStatus status, // - final ConsumeOrderlyContext context, // - final ConsumeRequest consumeRequest// - ) { - boolean continueConsume = true; - long commitOffset = -1L; - if (context.isAutoCommit()) { - switch (status) { - case COMMIT: - case ROLLBACK: - log.warn("the message queue consume result is illegal, we think you want to ack these message {}", - consumeRequest.getMessageQueue()); - case SUCCESS: - commitOffset = consumeRequest.getProcessQueue().commit(); - this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); - break; - case SUSPEND_CURRENT_QUEUE_A_MOMENT: - this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); - if (checkReconsumeTimes(msgs)) { - consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); - this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // - context.getSuspendCurrentQueueTimeMillis()); - continueConsume = false; - } else { - commitOffset = consumeRequest.getProcessQueue().commit(); - } - break; - default: - break; - } - } else { - switch (status) { - case SUCCESS: - this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); - break; - case COMMIT: - commitOffset = consumeRequest.getProcessQueue().commit(); - break; - case ROLLBACK: - consumeRequest.getProcessQueue().rollback(); - this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // - context.getSuspendCurrentQueueTimeMillis()); - continueConsume = false; - break; - case SUSPEND_CURRENT_QUEUE_A_MOMENT: - this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); - if (checkReconsumeTimes(msgs)) { - consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); - this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // - context.getSuspendCurrentQueueTimeMillis()); - continueConsume = false; - } - break; - default: - break; - } - } - - if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { - this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); - } - - return continueConsume; - } - - public ConsumerStatsManager getConsumerStatsManager() { - return this.defaultMQPushConsumerImpl.getConsumerStatsManager(); - } - - private int getMaxReconsumeTimes() { - // default reconsume times: Integer.MAX_VALUE - if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) { - return Integer.MAX_VALUE; - } else { - return this.defaultMQPushConsumer.getMaxReconsumeTimes(); - } - } - - private boolean checkReconsumeTimes(List<MessageExt> msgs) { - boolean suspend = false; - if (msgs != null && !msgs.isEmpty()) { - for (MessageExt msg : msgs) { - if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) { - MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes())); - if (!sendMessageBack(msg)) { - suspend = true; - msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); - } - } else { - suspend = true; - msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); - } - } - } - return suspend; - } - - public boolean sendMessageBack(final MessageExt msg) { - try { - // max reconsume times exceeded then send to dead letter queue. - Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); - String originMsgId = MessageAccessor.getOriginMessageId(msg); - MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); - newMsg.setFlag(msg.getFlag()); - MessageAccessor.setProperties(newMsg, msg.getProperties()); - MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); - MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes())); - MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); - newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); - - this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg); - return true; - } catch (Exception e) { - log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); - } - - return false; - } - - class ConsumeRequest implements Runnable { - private final ProcessQueue processQueue; - private final MessageQueue messageQueue; - - - public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) { - this.processQueue = processQueue; - this.messageQueue = messageQueue; - } - - public ProcessQueue getProcessQueue() { - return processQueue; - } - - public MessageQueue getMessageQueue() { - return messageQueue; - } - - @Override - public void run() { - if (this.processQueue.isDropped()) { - log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); - return; - } - - final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); - synchronized (objLock) { - if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) - || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { - final long beginTime = System.currentTimeMillis(); - for (boolean continueConsume = true; continueConsume; ) { - if (this.processQueue.isDropped()) { - log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); - break; - } - - if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) - && !this.processQueue.isLocked()) { - log.warn("the message queue not locked, so consume later, {}", this.messageQueue); - ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); - break; - } - - if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) - && this.processQueue.isLockExpired()) { - log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); - ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); - break; - } - - long interval = System.currentTimeMillis() - beginTime; - if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { - ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); - break; - } - - final int consumeBatchSize = - ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); - - List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); - if (!msgs.isEmpty()) { - final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); - - ConsumeOrderlyStatus status = null; - - ConsumeMessageContext consumeMessageContext = null; - if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { - consumeMessageContext = new ConsumeMessageContext(); - consumeMessageContext - .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); - consumeMessageContext.setMq(messageQueue); - consumeMessageContext.setMsgList(msgs); - consumeMessageContext.setSuccess(false); - // init the consume context type - consumeMessageContext.setProps(new HashMap<String, String>()); - ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); - } - - long beginTimestamp = System.currentTimeMillis(); - ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; - boolean hasException = false; - try { - this.processQueue.getLockConsume().lock(); - if (this.processQueue.isDropped()) { - log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", - this.messageQueue); - break; - } - - status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); - } catch (Throwable e) { - log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // - messageQueue); - hasException = true; - } finally { - this.processQueue.getLockConsume().unlock(); - } - - if (null == status // - || ConsumeOrderlyStatus.ROLLBACK == status// - || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { - log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // - messageQueue); - } - - long consumeRT = System.currentTimeMillis() - beginTimestamp; - if (null == status) { - if (hasException) { - returnType = ConsumeReturnType.EXCEPTION; - } else { - returnType = ConsumeReturnType.RETURNNULL; - } - } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { - returnType = ConsumeReturnType.TIME_OUT; - } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { - returnType = ConsumeReturnType.FAILED; - } else if (ConsumeOrderlyStatus.SUCCESS == status) { - returnType = ConsumeReturnType.SUCCESS; - } - consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); - if (null == status) { - status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; - } - - if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { - consumeMessageContext.setStatus(status.toString()); - consumeMessageContext - .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); - ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); - } - - ConsumeMessageOrderlyService.this.getConsumerStatsManager() - .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); - - continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); - } else { - continueConsume = false; - } - } - } else { - if (this.processQueue.isDropped()) { - log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); - return; - } - - ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); - } - } - } - - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java deleted file mode 100644 index 1f7f0d9..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public interface ConsumeMessageService { - void start(); - - - void shutdown(); - - - void updateCorePoolSize(int corePoolSize); - - - void incCorePoolSize(); - - - void decCorePoolSize(); - - - int getCorePoolSize(); - - - ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName); - - - void submitConsumeRequest(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // - final boolean dispathToConsume); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java deleted file mode 100644 index 1785ec9..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ /dev/null @@ -1,706 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.QueryResult; -import com.alibaba.rocketmq.client.Validators; -import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; -import com.alibaba.rocketmq.client.consumer.PullCallback; -import com.alibaba.rocketmq.client.consumer.PullResult; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore; -import com.alibaba.rocketmq.client.consumer.store.OffsetStore; -import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType; -import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.hook.ConsumeMessageContext; -import com.alibaba.rocketmq.client.hook.ConsumeMessageHook; -import com.alibaba.rocketmq.client.hook.FilterMessageHook; -import com.alibaba.rocketmq.client.impl.CommunicationMode; -import com.alibaba.rocketmq.client.impl.MQClientManager; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.ServiceState; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.filter.FilterAPI; -import com.alibaba.rocketmq.common.help.FAQUrl; -import com.alibaba.rocketmq.common.message.*; -import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; -import com.alibaba.rocketmq.common.sysflag.PullSysFlag; -import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import org.slf4j.Logger; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * @author shijia.wxr - */ -public class DefaultMQPullConsumerImpl implements MQConsumerInner { - private final Logger log = ClientLogger.getLog(); - private final DefaultMQPullConsumer defaultMQPullConsumer; - private final long consumerStartTimestamp = System.currentTimeMillis(); - private final RPCHook rpcHook; - private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); - private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); - private ServiceState serviceState = ServiceState.CREATE_JUST; - private MQClientInstance mQClientFactory; - private PullAPIWrapper pullAPIWrapper; - private OffsetStore offsetStore; - private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this); - - - public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) { - this.defaultMQPullConsumer = defaultMQPullConsumer; - this.rpcHook = rpcHook; - } - - public void registerConsumeMessageHook(final ConsumeMessageHook hook) { - this.consumeMessageHookList.add(hook); - log.info("register consumeMessageHook Hook, {}", hook.hookName()); - } - - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - createTopic(key, newTopic, queueNum, 0); - } - - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - this.makeSureStateOK(); - this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); - } - - private void makeSureStateOK() throws MQClientException { - if (this.serviceState != ServiceState.RUNNING) { - throw new MQClientException("The consumer service state not OK, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); - } - } - - public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException { - this.makeSureStateOK(); - return this.offsetStore.readOffset(mq, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE); - } - - public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException { - this.makeSureStateOK(); - if (null == topic) { - throw new IllegalArgumentException("topic is null"); - } - - ConcurrentHashMap<MessageQueue, ProcessQueue> mqTable = this.rebalanceImpl.getProcessQueueTable(); - Set<MessageQueue> mqResult = new HashSet<MessageQueue>(); - for (MessageQueue mq : mqTable.keySet()) { - if (mq.getTopic().equals(topic)) { - mqResult.add(mq); - } - } - - return mqResult; - } - - public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic); - } - - public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic); - } - - public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); - } - - public long maxOffset(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); - } - - public long minOffset(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().minOffset(mq); - } - - public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return pull(mq, subExpression, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis()); - } - - public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout); - } - - private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - this.makeSureStateOK(); - - if (null == mq) { - throw new MQClientException("mq is null", null); - - } - - if (offset < 0) { - throw new MQClientException("offset < 0", null); - } - - if (maxNums <= 0) { - throw new MQClientException("maxNums <= 0", null); - } - - this.subscriptionAutomatically(mq.getTopic()); - - int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); - - SubscriptionData subscriptionData; - try { - subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // - mq.getTopic(), subExpression); - } catch (Exception e) { - throw new MQClientException("parse subscription error", e); - } - - long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; - - PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(// - mq, // 1 - subscriptionData.getSubString(), // 2 - 0L, // 3 - offset, // 4 - maxNums, // 5 - sysFlag, // 6 - 0, // 7 - this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 - timeoutMillis, // 9 - CommunicationMode.SYNC, // 10 - null// 11 - ); - this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); - if (!this.consumeMessageHookList.isEmpty()) { - ConsumeMessageContext consumeMessageContext = null; - consumeMessageContext = new ConsumeMessageContext(); - consumeMessageContext.setConsumerGroup(this.groupName()); - consumeMessageContext.setMq(mq); - consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); - consumeMessageContext.setSuccess(false); - this.executeHookBefore(consumeMessageContext); - consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); - consumeMessageContext.setSuccess(true); - this.executeHookAfter(consumeMessageContext); - } - return pullResult; - } - - public void subscriptionAutomatically(final String topic) { - if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) { - try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // - topic, SubscriptionData.SUB_ALL); - this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData); - } catch (Exception e) { - } - } - } - - public void unsubscribe(String topic) { - this.rebalanceImpl.getSubscriptionInner().remove(topic); - } - - @Override - public String groupName() { - return this.defaultMQPullConsumer.getConsumerGroup(); - } - - public void executeHookBefore(final ConsumeMessageContext context) { - if (!this.consumeMessageHookList.isEmpty()) { - for (ConsumeMessageHook hook : this.consumeMessageHookList) { - try { - hook.consumeMessageBefore(context); - } catch (Throwable e) { - } - } - } - } - - public void executeHookAfter(final ConsumeMessageContext context) { - if (!this.consumeMessageHookList.isEmpty()) { - for (ConsumeMessageHook hook : this.consumeMessageHookList) { - try { - hook.consumeMessageAfter(context); - } catch (Throwable e) { - } - } - } - } - - @Override - public MessageModel messageModel() { - return this.defaultMQPullConsumer.getMessageModel(); - } - - @Override - public ConsumeType consumeType() { - return ConsumeType.CONSUME_ACTIVELY; - } - - @Override - public ConsumeFromWhere consumeFromWhere() { - return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; - } - - @Override - public Set<SubscriptionData> subscriptions() { - Set<SubscriptionData> result = new HashSet<SubscriptionData>(); - - Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics(); - if (topics != null) { - synchronized (topics) { - for (String t : topics) { - SubscriptionData ms = null; - try { - ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL); - } catch (Exception e) { - log.error("parse subscription error", e); - } - ms.setSubVersion(0L); - result.add(ms); - } - } - } - - return result; - } - - @Override - public void doRebalance() { - if (this.rebalanceImpl != null) { - this.rebalanceImpl.doRebalance(false); - } - } - - @Override - public void persistConsumerOffset() { - try { - this.makeSureStateOK(); - Set<MessageQueue> mqs = new HashSet<MessageQueue>(); - Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); - if (allocateMq != null) { - mqs.addAll(allocateMq); - } - this.offsetStore.persistAll(mqs); - } catch (Exception e) { - log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); - } - } - - @Override - public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) { - Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner(); - if (subTable != null) { - if (subTable.containsKey(topic)) { - this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info); - } - } - } - - @Override - public boolean isSubscribeTopicNeedUpdate(String topic) { - Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner(); - if (subTable != null) { - if (subTable.containsKey(topic)) { - return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic); - } - } - - return false; - } - - @Override - public boolean isUnitMode() { - return this.defaultMQPullConsumer.isUnitMode(); - } - - @Override - public ConsumerRunningInfo consumerRunningInfo() { - ConsumerRunningInfo info = new ConsumerRunningInfo(); - - Properties prop = MixAll.object2Properties(this.defaultMQPullConsumer); - prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp)); - info.setProperties(prop); - - info.getSubscriptionSet().addAll(this.subscriptions()); - return info; - } - - public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) - throws MQClientException, RemotingException, InterruptedException { - pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis()); - } - - public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout); - } - - private void pullAsyncImpl(// - final MessageQueue mq, // - final String subExpression, // - final long offset, // - final int maxNums, // - final PullCallback pullCallback, // - final boolean block, // - final long timeout) throws MQClientException, RemotingException, InterruptedException { - this.makeSureStateOK(); - - if (null == mq) { - throw new MQClientException("mq is null", null); - } - - if (offset < 0) { - throw new MQClientException("offset < 0", null); - } - - if (maxNums <= 0) { - throw new MQClientException("maxNums <= 0", null); - } - - if (null == pullCallback) { - throw new MQClientException("pullCallback is null", null); - } - - this.subscriptionAutomatically(mq.getTopic()); - - try { - int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); - - final SubscriptionData subscriptionData; - try { - subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // - mq.getTopic(), subExpression); - } catch (Exception e) { - throw new MQClientException("parse subscription error", e); - } - - long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; - - this.pullAPIWrapper.pullKernelImpl(// - mq, // 1 - subscriptionData.getSubString(), // 2 - 0L, // 3 - offset, // 4 - maxNums, // 5 - sysFlag, // 6 - 0, // 7 - this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 - timeoutMillis, // 9 - CommunicationMode.ASYNC, // 10 - new PullCallback() { - - @Override - public void onSuccess(PullResult pullResult) { - pullCallback - .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData)); - } - - @Override - public void onException(Throwable e) { - pullCallback.onException(e); - } - }); - } catch (MQBrokerException e) { - throw new MQClientException("pullAsync unknow exception", e); - } - } - - public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); - } - - public DefaultMQPullConsumer getDefaultMQPullConsumer() { - return defaultMQPullConsumer; - } - - public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) - throws MQClientException, RemotingException, InterruptedException { - this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true, - this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); - } - - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); - } - - public MessageExt queryMessageByUniqKey(String topic, String uniqKey) - throws MQClientException, InterruptedException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); - } - - public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); - } - - public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup()); - } - - public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException { - this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); - } - - public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - try { - String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) - : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); - - if (UtilAll.isBlank(consumerGroup)) { - consumerGroup = this.defaultMQPullConsumer.getConsumerGroup(); - } - - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000, - this.defaultMQPullConsumer.getMaxReconsumeTimes()); - } catch (Exception e) { - log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e); - - Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()), msg.getBody()); - String originMsgId = MessageAccessor.getOriginMessageId(msg); - MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); - newMsg.setFlag(msg.getFlag()); - MessageAccessor.setProperties(newMsg, msg.getProperties()); - MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); - MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); - MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes())); - newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); - this.mQClientFactory.getDefaultMQProducer().send(newMsg); - } - } - - public void shutdown() { - switch (this.serviceState) { - case CREATE_JUST: - break; - case RUNNING: - this.persistConsumerOffset(); - this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup()); - this.mQClientFactory.shutdown(); - log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup()); - this.serviceState = ServiceState.SHUTDOWN_ALREADY; - break; - case SHUTDOWN_ALREADY: - break; - default: - break; - } - } - - public void start() throws MQClientException { - switch (this.serviceState) { - case CREATE_JUST: - this.serviceState = ServiceState.START_FAILED; - - this.checkConfig(); - - this.copySubscription(); - - if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) { - this.defaultMQPullConsumer.changeInstanceNameToPID(); - } - - this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); - - this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup()); - this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel()); - this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()); - this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); - - this.pullAPIWrapper = new PullAPIWrapper(// - mQClientFactory, // - this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); - this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); - - if (this.defaultMQPullConsumer.getOffsetStore() != null) { - this.offsetStore = this.defaultMQPullConsumer.getOffsetStore(); - } else { - switch (this.defaultMQPullConsumer.getMessageModel()) { - case BROADCASTING: - this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); - break; - case CLUSTERING: - this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); - break; - default: - break; - } - } - - this.offsetStore.load(); - - boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this); - if (!registerOK) { - this.serviceState = ServiceState.CREATE_JUST; - - throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup() - + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), - null); - } - - mQClientFactory.start(); - log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup()); - this.serviceState = ServiceState.RUNNING; - break; - case RUNNING: - case START_FAILED: - case SHUTDOWN_ALREADY: - throw new MQClientException("The PullConsumer service state not OK, maybe started once, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); - default: - break; - } - } - - private void checkConfig() throws MQClientException { - // check consumerGroup - Validators.checkGroup(this.defaultMQPullConsumer.getConsumerGroup()); - - // consumerGroup - if (null == this.defaultMQPullConsumer.getConsumerGroup()) { - throw new MQClientException( - "consumerGroup is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // - null); - } - - // consumerGroup - if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { - throw new MQClientException( - "consumerGroup can not equal "// - + MixAll.DEFAULT_CONSUMER_GROUP // - + ", please specify another one."// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // - null); - } - - // messageModel - if (null == this.defaultMQPullConsumer.getMessageModel()) { - throw new MQClientException( - "messageModel is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // - null); - } - - // allocateMessageQueueStrategy - if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) { - throw new MQClientException( - "allocateMessageQueueStrategy is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // - null); - } - - // allocateMessageQueueStrategy - if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) { - throw new MQClientException( - "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // - null); - } - } - - private void copySubscription() throws MQClientException { - try { - Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics(); - if (registerTopics != null) { - for (final String topic : registerTopics) { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // - topic, SubscriptionData.SUB_ALL); - this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); - } - } - } catch (Exception e) { - throw new MQClientException("subscription exception", e); - } - } - - public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException { - this.makeSureStateOK(); - this.offsetStore.updateOffset(mq, offset, false); - } - - public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); - } - - public void registerFilterMessageHook(final FilterMessageHook hook) { - this.filterMessageHookList.add(hook); - log.info("register FilterMessageHook Hook, {}", hook.hookName()); - } - - public OffsetStore getOffsetStore() { - return offsetStore; - } - - public void setOffsetStore(OffsetStore offsetStore) { - this.offsetStore = offsetStore; - } - - public PullAPIWrapper getPullAPIWrapper() { - return pullAPIWrapper; - } - - public void setPullAPIWrapper(PullAPIWrapper pullAPIWrapper) { - this.pullAPIWrapper = pullAPIWrapper; - } - - public ServiceState getServiceState() { - return serviceState; - } - - public void setServiceState(ServiceState serviceState) { - this.serviceState = serviceState; - } - - public long getConsumerStartTimestamp() { - return consumerStartTimestamp; - } - - - public RebalanceImpl getRebalanceImpl() { - return rebalanceImpl; - } -}