http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java deleted file mode 100644 index 1125d09..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java +++ /dev/null @@ -1,229 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.remoting.exception.RemotingException; - -import java.util.Set; - - -/** - * Pulling consumer interface - * - * @author shijia.wxr - */ -public interface MQPullConsumer extends MQConsumer { - /** - * Start the consumer - * - * @throws MQClientException - */ - void start() throws MQClientException; - - - /** - * Shutdown the consumer - */ - void shutdown(); - - - /** - * Register the message queue listener - * - * @param topic - * @param listener - */ - void registerMessageQueueListener(final String topic, final MessageQueueListener listener); - - - /** - * Pulling the messages,not blocking - * - * @param mq - * from which message queue - * @param subExpression - * subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> - * if null or * expression,meaning subscribe all - * @param offset - * from where to pull - * @param maxNums - * max pulling numbers - * - * @return The resulting {@code PullRequest} - * - * @throws MQClientException - * @throws InterruptedException - * @throws MQBrokerException - * @throws RemotingException - */ - PullResult pull(final MessageQueue mq, final String subExpression, final long offset, - final int maxNums) throws MQClientException, RemotingException, MQBrokerException, - InterruptedException; - - - /** - * Pulling the messages in the specified timeout - * - * @param mq - * @param subExpression - * @param offset - * @param maxNums - * @param timeout - * - * @return The resulting {@code PullRequest} - * - * @throws MQClientException - * @throws RemotingException - * @throws MQBrokerException - * @throws InterruptedException - */ - PullResult pull(final MessageQueue mq, final String subExpression, final long offset, - final int maxNums, final long timeout) throws MQClientException, RemotingException, - MQBrokerException, InterruptedException; - - - /** - * Pulling the messages in a async. way - * - * @param mq - * @param subExpression - * @param offset - * @param maxNums - * @param pullCallback - * - * @throws MQClientException - * @throws RemotingException - * @throws InterruptedException - */ - void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums, - final PullCallback pullCallback) throws MQClientException, RemotingException, - InterruptedException; - - /** - * Pulling the messages in a async. way - * - * @param mq - * @param subExpression - * @param offset - * @param maxNums - * @param pullCallback - * @param timeout - * - * @throws MQClientException - * @throws RemotingException - * @throws InterruptedException - */ - void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums, - final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, - InterruptedException; - - - /** - * Pulling the messages,if no message arrival,blocking some time - * - * @param mq - * @param subExpression - * @param offset - * @param maxNums - * - * @return The resulting {@code PullRequest} - * - * @throws MQClientException - * @throws RemotingException - * @throws MQBrokerException - * @throws InterruptedException - */ - PullResult pullBlockIfNotFound(final MessageQueue mq, final String subExpression, - final long offset, final int maxNums) throws MQClientException, RemotingException, - MQBrokerException, InterruptedException; - - - /** - * Pulling the messages through callback function,if no message arrival,blocking. - * - * @param mq - * @param subExpression - * @param offset - * @param maxNums - * @param pullCallback - * - * @throws MQClientException - * @throws RemotingException - * @throws InterruptedException - */ - void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset, - final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException, - InterruptedException; - - - /** - * Update the offset - * - * @param mq - * @param offset - * - * @throws MQClientException - */ - void updateConsumeOffset(final MessageQueue mq, final long offset) throws MQClientException; - - - /** - * Fetch the offset - * - * @param mq - * @param fromStore - * - * @return The fetched offset of given queue - * - * @throws MQClientException - */ - long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) throws MQClientException; - - - /** - * Fetch the message queues according to the topic - * - * @param topic - * message topic - * - * @return message queue set - * - * @throws MQClientException - */ - Set<MessageQueue> fetchMessageQueuesInBalance(final String topic) throws MQClientException; - - /** - * If consuming failure,message will be send back to the broker,and delay consuming in some time later.<br> - * Mind! message can only be consumed in the same group. - * - * @param msg - * @param delayLevel - * @param brokerName - * @param consumerGroup - * - * @throws RemotingException - * @throws MQBrokerException - * @throws InterruptedException - * @throws MQClientException - */ - void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException; -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java deleted file mode 100644 index d68b559..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.ThreadFactoryImpl; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import org.slf4j.Logger; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - - -/** - * Schedule service for pull consumer - * - * @author shijia.wxr - */ -public class MQPullConsumerScheduleService { - private final Logger log = ClientLogger.getLog(); - private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl(); - private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable = - new ConcurrentHashMap<MessageQueue, PullTaskImpl>(); - private DefaultMQPullConsumer defaultMQPullConsumer; - private int pullThreadNums = 20; - private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable = - new ConcurrentHashMap<String, PullTaskCallback>(); - private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; - - public MQPullConsumerScheduleService(final String consumerGroup) { - this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup); - this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); - } - - public void putTask(String topic, Set<MessageQueue> mqNewSet) { - Iterator<Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<MessageQueue, PullTaskImpl> next = it.next(); - if (next.getKey().getTopic().equals(topic)) { - if (!mqNewSet.contains(next.getKey())) { - next.getValue().setCancelled(true); - it.remove(); - } - } - } - - for (MessageQueue mq : mqNewSet) { - if (!this.taskTable.containsKey(mq)) { - PullTaskImpl command = new PullTaskImpl(mq); - this.taskTable.put(mq, command); - this.scheduledThreadPoolExecutor.schedule(command, 0, TimeUnit.MILLISECONDS); - - } - } - } - - public void start() throws MQClientException { - final String group = this.defaultMQPullConsumer.getConsumerGroup(); - this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( - this.pullThreadNums, - new ThreadFactoryImpl("PullMsgThread-" + group) - ); - - this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener); - - this.defaultMQPullConsumer.start(); - - log.info("MQPullConsumerScheduleService start OK, {} {}", - this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable); - } - - public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) { - this.callbackTable.put(topic, callback); - this.defaultMQPullConsumer.registerMessageQueueListener(topic, null); - } - - public void shutdown() { - if (this.scheduledThreadPoolExecutor != null) { - this.scheduledThreadPoolExecutor.shutdown(); - } - - if (this.defaultMQPullConsumer != null) { - this.defaultMQPullConsumer.shutdown(); - } - } - - public ConcurrentHashMap<String, PullTaskCallback> getCallbackTable() { - return callbackTable; - } - - public void setCallbackTable(ConcurrentHashMap<String, PullTaskCallback> callbackTable) { - this.callbackTable = callbackTable; - } - - public int getPullThreadNums() { - return pullThreadNums; - } - - public void setPullThreadNums(int pullThreadNums) { - this.pullThreadNums = pullThreadNums; - } - - public DefaultMQPullConsumer getDefaultMQPullConsumer() { - return defaultMQPullConsumer; - } - - public void setDefaultMQPullConsumer(DefaultMQPullConsumer defaultMQPullConsumer) { - this.defaultMQPullConsumer = defaultMQPullConsumer; - } - - public MessageModel getMessageModel() { - return this.defaultMQPullConsumer.getMessageModel(); - } - - public void setMessageModel(MessageModel messageModel) { - this.defaultMQPullConsumer.setMessageModel(messageModel); - } - - class MessageQueueListenerImpl implements MessageQueueListener { - @Override - public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { - MessageModel messageModel = - MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel(); - switch (messageModel) { - case BROADCASTING: - MQPullConsumerScheduleService.this.putTask(topic, mqAll); - break; - case CLUSTERING: - MQPullConsumerScheduleService.this.putTask(topic, mqDivided); - break; - default: - break; - } - } - } - - class PullTaskImpl implements Runnable { - private final MessageQueue messageQueue; - private volatile boolean cancelled = false; - - - public PullTaskImpl(final MessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - - - @Override - public void run() { - String topic = this.messageQueue.getTopic(); - if (!this.isCancelled()) { - PullTaskCallback pullTaskCallback = - MQPullConsumerScheduleService.this.callbackTable.get(topic); - if (pullTaskCallback != null) { - final PullTaskContext context = new PullTaskContext(); - context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer); - try { - pullTaskCallback.doPullTask(this.messageQueue, context); - } catch (Throwable e) { - context.setPullNextDelayTimeMillis(1000); - log.error("doPullTask Exception", e); - } - - if (!this.isCancelled()) { - MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this, - context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS); - } else { - log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue); - } - } else { - log.warn("Pull Task Callback not exist , {}", topic); - } - } else { - log.warn("The Pull Task is cancelled, {}", messageQueue); - } - } - - - public boolean isCancelled() { - return cancelled; - } - - - public void setCancelled(boolean cancelled) { - this.cancelled = cancelled; - } - - - public MessageQueue getMessageQueue() { - return messageQueue; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java deleted file mode 100644 index e47739d..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -import com.alibaba.rocketmq.client.consumer.listener.MessageListener; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; -import com.alibaba.rocketmq.client.exception.MQClientException; - - -/** - * Push consumer - * - * @author shijia.wxr - */ -public interface MQPushConsumer extends MQConsumer { - /** - * Start the consumer - * - * @throws MQClientException - */ - void start() throws MQClientException; - - - /** - * Shutdown the consumer - */ - void shutdown(); - - - /** - * Register the message listener - * - * @param messageListener - */ - @Deprecated - void registerMessageListener(MessageListener messageListener); - - - void registerMessageListener(final MessageListenerConcurrently messageListener); - - - void registerMessageListener(final MessageListenerOrderly messageListener); - - - /** - * Subscribe some topic - * - * @param topic - * @param subExpression - * subscription expression.it only support or operation such as - * "tag1 || tag2 || tag3" <br> - * if null or * expression,meaning subscribe all - * - * @throws MQClientException - */ - void subscribe(final String topic, final String subExpression) throws MQClientException; - - - /** - * Subscribe some topic - * - * @param topic - * @param fullClassName - * full class name,must extend - * com.alibaba.rocketmq.common.filter. MessageFilter - * @param filterClassSource - * class source code,used UTF-8 file encoding,must be responsible - * for your code safety - * - * @throws MQClientException - */ - void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException; - - - /** - * Unsubscribe consumption some topic - * - * @param topic - * message topic - */ - void unsubscribe(final String topic); - - - /** - * Update the consumer thread pool size Dynamically - * - * @param corePoolSize - */ - void updateCorePoolSize(int corePoolSize); - - - /** - * Suspend the consumption - */ - void suspend(); - - - /** - * Resume the consumption - */ - void resume(); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java deleted file mode 100644 index bb25a3a..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.Set; - - -/** - * A MessageQueueListener is implemented by the application and may be specified when a message queue changed - * - * @author shijia.wxr - * @author vongosling - */ -public interface MessageQueueListener { - /** - * @param topic - * message topic - * @param mqAll - * all queues in this message topic - * @param mqDivided - * collection of queues,assigned to the current consumer - */ - void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, - final Set<MessageQueue> mqDivided); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java deleted file mode 100644 index 545cff2..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -/** - * Async message pulling interface - * - * @author shijia.wxr - */ -public interface PullCallback { - public void onSuccess(final PullResult pullResult); - - public void onException(final Throwable e); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java deleted file mode 100644 index b485243..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -import com.alibaba.rocketmq.common.message.MessageExt; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class PullResult { - private final PullStatus pullStatus; - private final long nextBeginOffset; - private final long minOffset; - private final long maxOffset; - private List<MessageExt> msgFoundList; - - - public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, - List<MessageExt> msgFoundList) { - super(); - this.pullStatus = pullStatus; - this.nextBeginOffset = nextBeginOffset; - this.minOffset = minOffset; - this.maxOffset = maxOffset; - this.msgFoundList = msgFoundList; - } - - - public PullStatus getPullStatus() { - return pullStatus; - } - - - public long getNextBeginOffset() { - return nextBeginOffset; - } - - - public long getMinOffset() { - return minOffset; - } - - - public long getMaxOffset() { - return maxOffset; - } - - - public List<MessageExt> getMsgFoundList() { - return msgFoundList; - } - - - public void setMsgFoundList(List<MessageExt> msgFoundList) { - this.msgFoundList = msgFoundList; - } - - - @Override - public String toString() { - return "PullResult [pullStatus=" + pullStatus + ", nextBeginOffset=" + nextBeginOffset - + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", msgFoundList=" - + (msgFoundList == null ? 0 : msgFoundList.size()) + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java deleted file mode 100644 index 35166f3..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -/** - * @author shijia.wxr - */ -public enum PullStatus { - /** - * Founded - */ - FOUND, - /** - * No new message can be pull - */ - NO_NEW_MSG, - /** - * Filtering results can not match - */ - NO_MATCHED_MSG, - /** - * Illegal offset,may be too big or too small - */ - OFFSET_ILLEGAL -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java deleted file mode 100644 index 19d5bfc..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -import com.alibaba.rocketmq.common.message.MessageQueue; - - -public interface PullTaskCallback { - public void doPullTask(final MessageQueue mq, final PullTaskContext context); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java deleted file mode 100644 index 72c57d6..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer; - -public class PullTaskContext { - - private int pullNextDelayTimeMillis = 200; - - private MQPullConsumer pullConsumer; - - - public int getPullNextDelayTimeMillis() { - return pullNextDelayTimeMillis; - } - - - public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) { - this.pullNextDelayTimeMillis = pullNextDelayTimeMillis; - } - - - public MQPullConsumer getPullConsumer() { - return pullConsumer; - } - - - public void setPullConsumer(MQPullConsumer pullConsumer) { - this.pullConsumer = pullConsumer; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java deleted file mode 100644 index 36fcf19..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.listener; - -import com.alibaba.rocketmq.common.message.MessageQueue; - - -/** - * Consumer concurrent consumption context - * - * @author shijia.wxr - */ -public class ConsumeConcurrentlyContext { - private final MessageQueue messageQueue; - /** - * Message consume retry strategy<br> - * -1,no retry,put into DLQ directly<br> - * 0,broker control retry frequency<br> - * >0,client control retry frequency - */ - private int delayLevelWhenNextConsume = 0; - private int ackIndex = Integer.MAX_VALUE; - - public ConsumeConcurrentlyContext(MessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - - - public int getDelayLevelWhenNextConsume() { - return delayLevelWhenNextConsume; - } - - - public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) { - this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; - } - - - public MessageQueue getMessageQueue() { - return messageQueue; - } - - - public int getAckIndex() { - return ackIndex; - } - - - public void setAckIndex(int ackIndex) { - this.ackIndex = ackIndex; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java deleted file mode 100644 index d0d3bf4..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.listener; - -/** - * @author shijia.wxr - */ -public enum ConsumeConcurrentlyStatus { - /** - * Success consumption - */ - CONSUME_SUCCESS, - /** - * Failure consumption,later try to consume - */ - RECONSUME_LATER; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java deleted file mode 100644 index 26a3892..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.listener; - -import com.alibaba.rocketmq.common.message.MessageQueue; - - -/** - * Consumer Orderly consumption context - * - * @author shijia.wxr - */ -public class ConsumeOrderlyContext { - private final MessageQueue messageQueue; - private boolean autoCommit = true; - private long suspendCurrentQueueTimeMillis = -1; - - - public ConsumeOrderlyContext(MessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - - - public boolean isAutoCommit() { - return autoCommit; - } - - - public void setAutoCommit(boolean autoCommit) { - this.autoCommit = autoCommit; - } - - - public MessageQueue getMessageQueue() { - return messageQueue; - } - - - public long getSuspendCurrentQueueTimeMillis() { - return suspendCurrentQueueTimeMillis; - } - - - public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { - this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java deleted file mode 100644 index e490c5c..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.listener; - -/** - * @author shijia.wxr - */ -public enum ConsumeOrderlyStatus { - /** - * Success consumption - */ - SUCCESS, - /** - * Rollback consumption(only for binlog consumption) - */ - @Deprecated - ROLLBACK, - /** - * Commit offset(only for binlog consumption) - */ - @Deprecated - COMMIT, - /** - * Suspend current queue a moment - */ - SUSPEND_CURRENT_QUEUE_A_MOMENT; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java deleted file mode 100644 index 44f998e..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.client.consumer.listener; - -/** - * Created by alvin on 16-11-30. - */ -public enum ConsumeReturnType { - /** - * consume return success - */ - SUCCESS, - /** - * consume timeout ,even if success - */ - TIME_OUT, - /** - * consume throw exception - */ - EXCEPTION, - /** - * consume return null - */ - RETURNNULL, - /** - * consume return failed - */ - FAILED -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java deleted file mode 100644 index f34946e..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.listener; - -/** - * A MessageListener object is used to receive asynchronously delivered messages. - * - * @author shijia.wxr - */ -public interface MessageListener { -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java deleted file mode 100644 index f0b0c61..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.listener; - -import com.alibaba.rocketmq.common.message.MessageExt; - -import java.util.List; - - -/** - * A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently - * - * @author shijia.wxr - */ -public interface MessageListenerConcurrently extends MessageListener { - /** - * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure - * - * @param msgs - * msgs.size() >= 1<br> - * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here - * @param context - * - * @return The consume status - */ - ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, - final ConsumeConcurrentlyContext context); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java deleted file mode 100644 index d30cdfa..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.listener; - -import com.alibaba.rocketmq.common.message.MessageExt; - -import java.util.List; - - -/** - * A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one thread - * - * @author shijia.wxr - */ -public interface MessageListenerOrderly extends MessageListener { - /** - * It is not recommend to throw exception,rather than returning ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT if consumption failure - * - * @param msgs - * msgs.size() >= 1<br> - * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here - * @param context - * - * @return The consume status - */ - ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs, - final ConsumeOrderlyContext context); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java deleted file mode 100644 index 413d646..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.rebalance; - -import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.message.MessageQueue; -import org.slf4j.Logger; - -import java.util.ArrayList; -import java.util.List; - - -/** - * Average Hashing queue algorithm - * - * @author manhong.yqd - */ -public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { - private final Logger log = ClientLogger.getLog(); - - @Override - public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, - List<String> cidAll) { - if (currentCID == null || currentCID.length() < 1) { - throw new IllegalArgumentException("currentCID is empty"); - } - if (mqAll == null || mqAll.isEmpty()) { - throw new IllegalArgumentException("mqAll is null or mqAll empty"); - } - if (cidAll == null || cidAll.isEmpty()) { - throw new IllegalArgumentException("cidAll is null or cidAll empty"); - } - - List<MessageQueue> result = new ArrayList<MessageQueue>(); - if (!cidAll.contains(currentCID)) { - log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", - consumerGroup, - currentCID, - cidAll); - return result; - } - - int index = cidAll.indexOf(currentCID); - int mod = mqAll.size() % cidAll.size(); - int averageSize = - mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() - + 1 : mqAll.size() / cidAll.size()); - int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; - int range = Math.min(averageSize, mqAll.size() - startIndex); - for (int i = 0; i < range; i++) { - result.add(mqAll.get((startIndex + i) % mqAll.size())); - } - return result; - } - - @Override - public String getName() { - return "AVG"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java deleted file mode 100644 index 17f4611..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.rebalance; - -import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.message.MessageQueue; -import org.slf4j.Logger; - -import java.util.ArrayList; -import java.util.List; - - -/** - * Cycle average Hashing queue algorithm - * - * @author manhong.yqd - */ -public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy { - private final Logger log = ClientLogger.getLog(); - - @Override - public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, - List<String> cidAll) { - if (currentCID == null || currentCID.length() < 1) { - throw new IllegalArgumentException("currentCID is empty"); - } - if (mqAll == null || mqAll.isEmpty()) { - throw new IllegalArgumentException("mqAll is null or mqAll empty"); - } - if (cidAll == null || cidAll.isEmpty()) { - throw new IllegalArgumentException("cidAll is null or cidAll empty"); - } - - List<MessageQueue> result = new ArrayList<MessageQueue>(); - if (!cidAll.contains(currentCID)) { - log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", - consumerGroup, - currentCID, - cidAll); - return result; - } - - int index = cidAll.indexOf(currentCID); - for (int i = index; i < mqAll.size(); i++) { - if (i % cidAll.size() == index) { - result.add(mqAll.get(i)); - } - } - return result; - } - - @Override - public String getName() { - return "AVG_BY_CIRCLE"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java deleted file mode 100644 index 783678c..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.rebalance; - -import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy { - private List<MessageQueue> messageQueueList; - - @Override - public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, - List<String> cidAll) { - return this.messageQueueList; - } - - @Override - public String getName() { - return "CONFIG"; - } - - public List<MessageQueue> getMessageQueueList() { - return messageQueueList; - } - - - public void setMessageQueueList(List<MessageQueue> messageQueueList) { - this.messageQueueList = messageQueueList; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java deleted file mode 100644 index 5464fe3..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.rebalance; - -import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - - -/** - * Computer room Hashing queue algorithm, such as Alipay logic room - */ -public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy { - private Set<String> consumeridcs; - - @Override - public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, - List<String> cidAll) { - List<MessageQueue> result = new ArrayList<MessageQueue>(); - int currentIndex = cidAll.indexOf(currentCID); - if (currentIndex < 0) { - return result; - } - List<MessageQueue> premqAll = new ArrayList<MessageQueue>(); - for (MessageQueue mq : mqAll) { - String[] temp = mq.getBrokerName().split("@"); - if (temp.length == 2 && consumeridcs.contains(temp[0])) { - premqAll.add(mq); - } - } - // Todo cid - int mod = premqAll.size() / cidAll.size(); - int rem = premqAll.size() % cidAll.size(); - int startindex = mod * currentIndex; - int endindex = startindex + mod; - for (int i = startindex; i < endindex; i++) { - result.add(mqAll.get(i)); - } - if (rem > currentIndex) { - result.add(premqAll.get(currentIndex + mod * cidAll.size())); - } - return result; - } - - @Override - public String getName() { - return "MACHINE_ROOM"; - } - - public Set<String> getConsumeridcs() { - return consumeridcs; - } - - - public void setConsumeridcs(Set<String> consumeridcs) { - this.consumeridcs = consumeridcs; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/LocalFileOffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/LocalFileOffsetStore.java deleted file mode 100644 index 39aec12..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.store; - -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.help.FAQUrl; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import org.slf4j.Logger; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - - -/** - * Local storage implementation - * - * @author shijia.wxr - */ -public class LocalFileOffsetStore implements OffsetStore { - public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty( - "rocketmq.client.localOffsetStoreDir", - System.getProperty("user.home") + File.separator + ".rocketmq_offsets"); - private final static Logger log = ClientLogger.getLog(); - private final MQClientInstance mQClientFactory; - private final String groupName; - private final String storePath; - private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = - new ConcurrentHashMap<MessageQueue, AtomicLong>(); - - - public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) { - this.mQClientFactory = mQClientFactory; - this.groupName = groupName; - this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + // - this.mQClientFactory.getClientId() + File.separator + // - this.groupName + File.separator + // - "offsets.json"; - } - - - @Override - public void load() throws MQClientException { - OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); - if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { - offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); - - for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) { - AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); - log.info("load consumer's offset, {} {} {}", - this.groupName, - mq, - offset.get()); - } - } - } - - - @Override - public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { - if (mq != null) { - AtomicLong offsetOld = this.offsetTable.get(mq); - if (null == offsetOld) { - offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); - } - - if (null != offsetOld) { - if (increaseOnly) { - MixAll.compareAndIncreaseOnly(offsetOld, offset); - } else { - offsetOld.set(offset); - } - } - } - } - - - @Override - public long readOffset(final MessageQueue mq, final ReadOffsetType type) { - if (mq != null) { - switch (type) { - case MEMORY_FIRST_THEN_STORE: - case READ_FROM_MEMORY: { - AtomicLong offset = this.offsetTable.get(mq); - if (offset != null) { - return offset.get(); - } else if (ReadOffsetType.READ_FROM_MEMORY == type) { - return -1; - } - } - case READ_FROM_STORE: { - OffsetSerializeWrapper offsetSerializeWrapper; - try { - offsetSerializeWrapper = this.readLocalOffset(); - } catch (MQClientException e) { - return -1; - } - if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { - AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); - if (offset != null) { - this.updateOffset(mq, offset.get(), false); - return offset.get(); - } - } - } - default: - break; - } - } - - return -1; - } - - - @Override - public void persistAll(Set<MessageQueue> mqs) { - if (null == mqs || mqs.isEmpty()) - return; - - OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); - for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { - if (mqs.contains(entry.getKey())) { - AtomicLong offset = entry.getValue(); - offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); - } - } - - String jsonString = offsetSerializeWrapper.toJson(true); - if (jsonString != null) { - try { - MixAll.string2File(jsonString, this.storePath); - } catch (IOException e) { - log.error("persistAll consumer offset Exception, " + this.storePath, e); - } - } - } - - - @Override - public void persist(MessageQueue mq) { - } - - @Override - public void removeOffset(MessageQueue mq) { - - } - - @Override - public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - - } - - @Override - public Map<MessageQueue, Long> cloneOffsetTable(String topic) { - Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>(); - for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { - MessageQueue mq = entry.getKey(); - if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) { - continue; - } - cloneOffsetTable.put(mq, entry.getValue().get()); - - } - return cloneOffsetTable; - } - - private OffsetSerializeWrapper readLocalOffset() throws MQClientException { - String content = MixAll.file2String(this.storePath); - if (null == content || content.length() == 0) { - return this.readLocalOffsetBak(); - } else { - OffsetSerializeWrapper offsetSerializeWrapper = null; - try { - offsetSerializeWrapper = - OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); - } catch (Exception e) { - log.warn("readLocalOffset Exception, and try to correct", e); - return this.readLocalOffsetBak(); - } - - return offsetSerializeWrapper; - } - } - - private OffsetSerializeWrapper readLocalOffsetBak() throws MQClientException { - String content = MixAll.file2String(this.storePath + ".bak"); - if (content != null && content.length() > 0) { - OffsetSerializeWrapper offsetSerializeWrapper = null; - try { - offsetSerializeWrapper = - OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); - } catch (Exception e) { - log.warn("readLocalOffset Exception", e); - throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" // - + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), // - e); - } - return offsetSerializeWrapper; - } - - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetSerializeWrapper.java deleted file mode 100644 index 4434b86..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetSerializeWrapper.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.store; - -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - - -/** - * Wrapper class for offset serialization - * - * @author shijia.wxr - */ -public class OffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = - new ConcurrentHashMap<MessageQueue, AtomicLong>(); - - public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() { - return offsetTable; - } - - public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) { - this.offsetTable = offsetTable; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetStore.java deleted file mode 100644 index 346beb1..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetStore.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.store; - -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.remoting.exception.RemotingException; - -import java.util.Map; -import java.util.Set; - - -/** - * Offset store interface - * - * @author shijia.wxr - */ -public interface OffsetStore { - /** - * Load - * - * @throws MQClientException - */ - void load() throws MQClientException; - - - /** - * Update the offset,store it in memory - * - * @param mq - * @param offset - * @param increaseOnly - */ - void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly); - - /** - * Get offset from local storage - * - * @param mq - * @param type - * - * @return The fetched offset - */ - long readOffset(final MessageQueue mq, final ReadOffsetType type); - - /** - * Persist all offsets,may be in local storage or remote name server - * - * @param mqs - */ - void persistAll(final Set<MessageQueue> mqs); - - /** - * Persist the offset,may be in local storage or remote name server - * - * @param mq - */ - void persist(final MessageQueue mq); - - /** - * Remove offset - * - * @param mq - */ - void removeOffset(MessageQueue mq); - - /** - * @param topic - * - * @return The cloned offset table of given topic - */ - Map<MessageQueue, Long> cloneOffsetTable(String topic); - - /** - * - * @param mq - * @param offset - * @param isOneway - */ - void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/ReadOffsetType.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/ReadOffsetType.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/ReadOffsetType.java deleted file mode 100644 index 3691a62..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/ReadOffsetType.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.store; - -public enum ReadOffsetType { - /** - * From memory - */ - READ_FROM_MEMORY, - /** - * From storage - */ - READ_FROM_STORE, - /** - * From memory,then from storage - */ - MEMORY_FIRST_THEN_STORE; -}