http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java deleted file mode 100644 index 7150513..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.example.benchmark; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.srvutil.ServerUtil; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; - -import java.util.LinkedList; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.atomic.AtomicLong; - -public class Consumer { - - public static void main(String[] args) throws MQClientException { - Options options = ServerUtil.buildCommandlineOptions(new Options()); - CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser()); - if (null == commandLine) { - System.exit(-1); - } - - final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; - final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer"; - final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true"; - String group = groupPrefix; - if (Boolean.parseBoolean(isPrefixEnable)) { - group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100); - } - - System.out.printf("topic %s group %s prefix %s%n", topic, group, isPrefixEnable); - - final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); - - final Timer timer = new Timer("BenchmarkTimerThread", true); - - final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>(); - - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - snapshotList.addLast(statsBenchmarkConsumer.createSnapshot()); - if (snapshotList.size() > 10) { - snapshotList.removeFirst(); - } - } - }, 1000, 1000); - - timer.scheduleAtFixedRate(new TimerTask() { - private void printStats() { - if (snapshotList.size() >= 10) { - Long[] begin = snapshotList.getFirst(); - Long[] end = snapshotList.getLast(); - - final long consumeTps = - (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L); - final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]); - final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]); - - System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n", - consumeTps, averageB2CRT, averageS2CRT, end[4], end[5] - ); - } - } - - - @Override - public void run() { - try { - this.printStats(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, 10000, 10000); - - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); - consumer.setInstanceName(Long.toString(System.currentTimeMillis())); - - consumer.subscribe(topic, "*"); - - consumer.registerMessageListener(new MessageListenerConcurrently() { - @Override - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { - MessageExt msg = msgs.get(0); - long now = System.currentTimeMillis(); - - statsBenchmarkConsumer.getReceiveMessageTotalCount().incrementAndGet(); - - long born2ConsumerRT = now - msg.getBornTimestamp(); - statsBenchmarkConsumer.getBorn2ConsumerTotalRT().addAndGet(born2ConsumerRT); - - long store2ConsumerRT = now - msg.getStoreTimestamp(); - statsBenchmarkConsumer.getStore2ConsumerTotalRT().addAndGet(store2ConsumerRT); - - compareAndSetMax(statsBenchmarkConsumer.getBorn2ConsumerMaxRT(), born2ConsumerRT); - - compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT); - - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - }); - - consumer.start(); - - System.out.printf("Consumer Started.%n"); - } - - public static Options buildCommandlineOptions(final Options options) { - Option opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("g", "group", true, "Consumer group name, Default: benchmark_consumer"); - opt.setRequired(false); - options.addOption(opt); - - - opt = new Option("p", "group prefix enable", true, "Consumer group name, Default: false"); - opt.setRequired(false); - options.addOption(opt); - - return options; - } - - - public static void compareAndSetMax(final AtomicLong target, final long value) { - long prev = target.get(); - while (value > prev) { - boolean updated = target.compareAndSet(prev, value); - if (updated) - break; - - prev = target.get(); - } - } -} - - -class StatsBenchmarkConsumer { - private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L); - - private final AtomicLong born2ConsumerTotalRT = new AtomicLong(0L); - - private final AtomicLong store2ConsumerTotalRT = new AtomicLong(0L); - - private final AtomicLong born2ConsumerMaxRT = new AtomicLong(0L); - - private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L); - - - public Long[] createSnapshot() { - Long[] snap = new Long[]{ - System.currentTimeMillis(), - this.receiveMessageTotalCount.get(), - this.born2ConsumerTotalRT.get(), - this.store2ConsumerTotalRT.get(), - this.born2ConsumerMaxRT.get(), - this.store2ConsumerMaxRT.get(), - }; - - return snap; - } - - - public AtomicLong getReceiveMessageTotalCount() { - return receiveMessageTotalCount; - } - - - public AtomicLong getBorn2ConsumerTotalRT() { - return born2ConsumerTotalRT; - } - - - public AtomicLong getStore2ConsumerTotalRT() { - return store2ConsumerTotalRT; - } - - - public AtomicLong getBorn2ConsumerMaxRT() { - return born2ConsumerMaxRT; - } - - - public AtomicLong getStore2ConsumerMaxRT() { - return store2ConsumerMaxRT; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java deleted file mode 100644 index b0351c6..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.benchmark; - -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.client.producer.DefaultMQProducer; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import com.alibaba.rocketmq.srvutil.ServerUtil; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; -import org.slf4j.Logger; - -import java.io.UnsupportedEncodingException; -import java.util.LinkedList; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; - -public class Producer { - public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { - - Options options = ServerUtil.buildCommandlineOptions(new Options()); - CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser()); - if (null == commandLine) { - System.exit(-1); - } - - final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; - final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64; - final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128; - final boolean keyEnable = commandLine.hasOption('k') ? Boolean.parseBoolean(commandLine.getOptionValue('k')) : false; - - System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable); - - final Logger log = ClientLogger.getLog(); - - final Message msg = buildMessage(messageSize, topic); - - final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); - - final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer(); - - final Timer timer = new Timer("BenchmarkTimerThread", true); - - final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>(); - - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - snapshotList.addLast(statsBenchmark.createSnapshot()); - if (snapshotList.size() > 10) { - snapshotList.removeFirst(); - } - } - }, 1000, 1000); - - timer.scheduleAtFixedRate(new TimerTask() { - private void printStats() { - if (snapshotList.size() >= 10) { - Long[] begin = snapshotList.getFirst(); - Long[] end = snapshotList.getLast(); - - final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); - final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); - - System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n", - sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); - } - } - - - @Override - public void run() { - try { - this.printStats(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, 10000, 10000); - - final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer"); - producer.setInstanceName(Long.toString(System.currentTimeMillis())); - - if (commandLine.hasOption('n')) { - String ns = commandLine.getOptionValue('n'); - producer.setNamesrvAddr(ns); - } - - producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE); - - producer.start(); - - for (int i = 0; i < threadCount; i++) { - sendThreadPool.execute(new Runnable() { - @Override - public void run() { - while (true) { - try { - final long beginTimestamp = System.currentTimeMillis(); - if (keyEnable) { - msg.setKeys(String.valueOf(beginTimestamp / 1000)); - } - producer.send(msg); - statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); - statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); - final long currentRT = System.currentTimeMillis() - beginTimestamp; - statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT); - long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); - while (currentRT > prevMaxRT) { - boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT); - if (updated) - break; - - prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); - } - } catch (RemotingException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); - log.error("[BENCHMARK_PRODUCER] Send Exception", e); - - try { - Thread.sleep(3000); - } catch (InterruptedException e1) { - } - } catch (InterruptedException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); - try { - Thread.sleep(3000); - } catch (InterruptedException e1) { - } - } catch (MQClientException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); - log.error("[BENCHMARK_PRODUCER] Send Exception", e); - } catch (MQBrokerException e) { - statsBenchmark.getReceiveResponseFailedCount().incrementAndGet(); - log.error("[BENCHMARK_PRODUCER] Send Exception", e); - try { - Thread.sleep(3000); - } catch (InterruptedException e1) { - } - } - } - } - }); - } - } - - public static Options buildCommandlineOptions(final Options options) { - Option opt = new Option("w", "threadCount", true, "Thread count, Default: 64"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("s", "messageSize", true, "Message Size, Default: 128"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("k", "keyEnable", true, "Message Key Enable, Default: false"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest"); - opt.setRequired(false); - options.addOption(opt); - - return options; - } - - private static Message buildMessage(final int messageSize, final String topic) throws UnsupportedEncodingException { - Message msg = new Message(); - msg.setTopic(topic); - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < messageSize; i += 10) { - sb.append("hello baby"); - } - - msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); - - return msg; - } -} - - -class StatsBenchmarkProducer { - private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); - - private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); - - private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L); - - private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L); - - private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L); - - private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); - - - public Long[] createSnapshot() { - Long[] snap = new Long[]{ - System.currentTimeMillis(), - this.sendRequestSuccessCount.get(), - this.sendRequestFailedCount.get(), - this.receiveResponseSuccessCount.get(), - this.receiveResponseFailedCount.get(), - this.sendMessageSuccessTimeTotal.get(), - }; - - return snap; - } - - - public AtomicLong getSendRequestSuccessCount() { - return sendRequestSuccessCount; - } - - - public AtomicLong getSendRequestFailedCount() { - return sendRequestFailedCount; - } - - - public AtomicLong getReceiveResponseSuccessCount() { - return receiveResponseSuccessCount; - } - - - public AtomicLong getReceiveResponseFailedCount() { - return receiveResponseFailedCount; - } - - - public AtomicLong getSendMessageSuccessTimeTotal() { - return sendMessageSuccessTimeTotal; - } - - - public AtomicLong getSendMessageMaxRT() { - return sendMessageMaxRT; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java deleted file mode 100644 index 3dffd2f..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.example.benchmark; - -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.producer.*; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; - -import java.io.UnsupportedEncodingException; -import java.util.LinkedList; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; - -public class TransactionProducer { - private static int threadCount; - private static int messageSize; - private static boolean ischeck; - private static boolean ischeckffalse; - - - public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { - threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32; - messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; - ischeck = args.length >= 3 ? Boolean.parseBoolean(args[2]) : false; - ischeckffalse = args.length >= 4 ? Boolean.parseBoolean(args[3]) : false; - - final Message msg = buildMessage(messageSize); - - final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); - - final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer(); - - final Timer timer = new Timer("BenchmarkTimerThread", true); - - final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>(); - - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - snapshotList.addLast(statsBenchmark.createSnapshot()); - while (snapshotList.size() > 10) { - snapshotList.removeFirst(); - } - } - }, 1000, 1000); - - timer.scheduleAtFixedRate(new TimerTask() { - private void printStats() { - if (snapshotList.size() >= 10) { - Long[] begin = snapshotList.getFirst(); - Long[] end = snapshotList.getLast(); - - final long sendTps = - (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); - final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); - - System.out.printf( - "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", - sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]); - } - } - - - @Override - public void run() { - try { - this.printStats(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, 10000, 10000); - - final TransactionCheckListener transactionCheckListener = - new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark); - final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); - producer.setInstanceName(Long.toString(System.currentTimeMillis())); - producer.setTransactionCheckListener(transactionCheckListener); - producer.setDefaultTopicQueueNums(1000); - producer.start(); - - final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck); - - for (int i = 0; i < threadCount; i++) { - sendThreadPool.execute(new Runnable() { - @Override - public void run() { - while (true) { - try { - // Thread.sleep(1000); - final long beginTimestamp = System.currentTimeMillis(); - SendResult sendResult = - producer.sendMessageInTransaction(msg, tranExecuter, null); - if (sendResult != null) { - statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); - statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); - } - - final long currentRT = System.currentTimeMillis() - beginTimestamp; - statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT); - long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); - while (currentRT > prevMaxRT) { - boolean updated = - statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, - currentRT); - if (updated) - break; - - prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); - } - } catch (MQClientException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); - } - } - } - }); - } - } - - - private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException { - Message msg = new Message(); - msg.setTopic("BenchmarkTest"); - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < messageSize; i += 10) { - sb.append("hello baby"); - } - - msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); - - return msg; - } -} - - -class TransactionExecuterBImpl implements LocalTransactionExecuter { - - private boolean ischeck; - - - public TransactionExecuterBImpl(boolean ischeck) { - this.ischeck = ischeck; - } - - - @Override - public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { - if (ischeck) { - return LocalTransactionState.UNKNOW; - } - return LocalTransactionState.COMMIT_MESSAGE; - } -} - - -class TransactionCheckListenerBImpl implements TransactionCheckListener { - private boolean ischeckffalse; - private StatsBenchmarkTProducer statsBenchmarkTProducer; - - - public TransactionCheckListenerBImpl(boolean ischeckffalse, - StatsBenchmarkTProducer statsBenchmarkTProducer) { - this.ischeckffalse = ischeckffalse; - this.statsBenchmarkTProducer = statsBenchmarkTProducer; - } - - - @Override - public LocalTransactionState checkLocalTransactionState(MessageExt msg) { - statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet(); - if (ischeckffalse) { - - return LocalTransactionState.ROLLBACK_MESSAGE; - } - - return LocalTransactionState.COMMIT_MESSAGE; - } -} - - -class StatsBenchmarkTProducer { - private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); - - private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); - - private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L); - - private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L); - - private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L); - - private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); - - private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L); - - - public Long[] createSnapshot() { - Long[] snap = new Long[]{ - System.currentTimeMillis(), - this.sendRequestSuccessCount.get(), - this.sendRequestFailedCount.get(), - this.receiveResponseSuccessCount.get(), - this.receiveResponseFailedCount.get(), - this.sendMessageSuccessTimeTotal.get(), - this.checkRequestSuccessCount.get()}; - - return snap; - } - - - public AtomicLong getSendRequestSuccessCount() { - return sendRequestSuccessCount; - } - - - public AtomicLong getSendRequestFailedCount() { - return sendRequestFailedCount; - } - - - public AtomicLong getReceiveResponseSuccessCount() { - return receiveResponseSuccessCount; - } - - - public AtomicLong getReceiveResponseFailedCount() { - return receiveResponseFailedCount; - } - - - public AtomicLong getSendMessageSuccessTimeTotal() { - return sendMessageSuccessTimeTotal; - } - - - public AtomicLong getSendMessageMaxRT() { - return sendMessageMaxRT; - } - - - public AtomicLong getCheckRequestSuccessCount() { - return checkRequestSuccessCount; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java deleted file mode 100644 index 6cc6238..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.broadcast; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; - -import java.util.List; - -public class PushConsumer { - - public static void main(String[] args) throws InterruptedException, MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); - - consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); - - consumer.setMessageModel(MessageModel.BROADCASTING); - - consumer.subscribe("TopicTest", "TagA || TagC || TagD"); - - consumer.registerMessageListener(new MessageListenerConcurrently() { - - @Override - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { - System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - }); - - consumer.start(); - System.out.printf("Broadcast Consumer Started.%n"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java b/example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java deleted file mode 100644 index 104e6d9..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.filter; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.message.MessageExt; - -import java.util.List; - - -public class Consumer { - - public static void main(String[] args) throws InterruptedException, MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); - - String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java"); - consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", - filterCode); - - consumer.registerMessageListener(new MessageListenerConcurrently() { - - @Override - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { - System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - }); - - consumer.start(); - - System.out.printf("Consumer Started.%n"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java deleted file mode 100644 index 04251fa..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.filter; - -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.producer.DefaultMQProducer; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; - -public class Producer { - public static void main(String[] args) throws MQClientException, InterruptedException { - DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); - producer.start(); - - try { - for (int i = 0; i < 6000000; i++) { - Message msg = new Message("TopicFilter7", - "TagA", - "OrderID001", - "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); - - msg.putUserProperty("SequenceId", String.valueOf(i)); - SendResult sendResult = producer.send(msg); - System.out.printf("%s%n", sendResult); - } - } catch (Exception e) { - e.printStackTrace(); - } - producer.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java b/example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java deleted file mode 100644 index f6ba067..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.operation; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.message.MessageExt; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; - -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - - -public class Consumer { - - public static void main(String[] args) throws InterruptedException, MQClientException { - CommandLine commandLine = buildCommandline(args); - if (commandLine != null) { - String group = commandLine.getOptionValue('g'); - String topic = commandLine.getOptionValue('t'); - String subscription = commandLine.getOptionValue('s'); - final String returnFailedHalf = commandLine.getOptionValue('f'); - - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); - consumer.setInstanceName(Long.toString(System.currentTimeMillis())); - - consumer.subscribe(topic, subscription); - - consumer.registerMessageListener(new MessageListenerConcurrently() { - AtomicLong consumeTimes = new AtomicLong(0); - - - @Override - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { - long currentTimes = this.consumeTimes.incrementAndGet(); - System.out.printf("%-8d %s%n", currentTimes, msgs); - if (Boolean.parseBoolean(returnFailedHalf)) { - if ((currentTimes % 2) == 0) { - return ConsumeConcurrentlyStatus.RECONSUME_LATER; - } - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - }); - - consumer.start(); - - System.out.printf("Consumer Started.%n"); - } - } - - public static CommandLine buildCommandline(String[] args) { - final Options options = new Options(); - Option opt = new Option("h", "help", false, "Print help"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("g", "consumerGroup", true, "Consumer Group Name"); - opt.setRequired(true); - options.addOption(opt); - - opt = new Option("t", "topic", true, "Topic Name"); - opt.setRequired(true); - options.addOption(opt); - - opt = new Option("s", "subscription", true, "subscription"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("f", "returnFailedHalf", true, "return failed result, for half message"); - opt.setRequired(true); - options.addOption(opt); - - PosixParser parser = new PosixParser(); - HelpFormatter hf = new HelpFormatter(); - hf.setWidth(110); - CommandLine commandLine = null; - try { - commandLine = parser.parse(options, args); - if (commandLine.hasOption('h')) { - hf.printHelp("producer", options, true); - return null; - } - } catch (ParseException e) { - hf.printHelp("producer", options, true); - return null; - } - - return commandLine; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java deleted file mode 100644 index 816e3e8..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.operation; - -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.producer.DefaultMQProducer; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import org.apache.commons.cli.*; - -public class Producer { - - public static void main(String[] args) throws MQClientException, InterruptedException { - CommandLine commandLine = buildCommandline(args); - if (commandLine != null) { - String group = commandLine.getOptionValue('g'); - String topic = commandLine.getOptionValue('t'); - String tags = commandLine.getOptionValue('a'); - String keys = commandLine.getOptionValue('k'); - String msgCount = commandLine.getOptionValue('c'); - - DefaultMQProducer producer = new DefaultMQProducer(group); - producer.setInstanceName(Long.toString(System.currentTimeMillis())); - - producer.start(); - - for (int i = 0; i < Integer.parseInt(msgCount); i++) { - try { - Message msg = new Message( - topic, - tags, - keys, - ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); - SendResult sendResult = producer.send(msg); - System.out.printf("%-8d %s%n", i, sendResult); - } catch (Exception e) { - e.printStackTrace(); - Thread.sleep(1000); - } - } - - producer.shutdown(); - } - } - - public static CommandLine buildCommandline(String[] args) { - final Options options = new Options(); - Option opt = new Option("h", "help", false, "Print help"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("g", "producerGroup", true, "Producer Group Name"); - opt.setRequired(true); - options.addOption(opt); - - opt = new Option("t", "topic", true, "Topic Name"); - opt.setRequired(true); - options.addOption(opt); - - opt = new Option("a", "tags", true, "Tags Name"); - opt.setRequired(true); - options.addOption(opt); - - opt = new Option("k", "keys", true, "Keys Name"); - opt.setRequired(true); - options.addOption(opt); - - opt = new Option("c", "msgCount", true, "Message Count"); - opt.setRequired(true); - options.addOption(opt); - - PosixParser parser = new PosixParser(); - HelpFormatter hf = new HelpFormatter(); - hf.setWidth(110); - CommandLine commandLine = null; - try { - commandLine = parser.parse(options, args); - if (commandLine.hasOption('h')) { - hf.printHelp("producer", options, true); - return null; - } - } catch (ParseException e) { - hf.printHelp("producer", options, true); - return null; - } - - return commandLine; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java b/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java deleted file mode 100644 index 7b5f657..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.ordermessage; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.message.MessageExt; - -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - - -public class Consumer { - - public static void main(String[] args) throws MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); - - consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); - - consumer.subscribe("TopicTest", "TagA || TagC || TagD"); - - consumer.registerMessageListener(new MessageListenerOrderly() { - AtomicLong consumeTimes = new AtomicLong(0); - - @Override - public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { - context.setAutoCommit(false); - System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); - this.consumeTimes.incrementAndGet(); - if ((this.consumeTimes.get() % 2) == 0) { - return ConsumeOrderlyStatus.SUCCESS; - } else if ((this.consumeTimes.get() % 3) == 0) { - return ConsumeOrderlyStatus.ROLLBACK; - } else if ((this.consumeTimes.get() % 4) == 0) { - return ConsumeOrderlyStatus.COMMIT; - } else if ((this.consumeTimes.get() % 5) == 0) { - context.setSuspendCurrentQueueTimeMillis(3000); - return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; - } - - return ConsumeOrderlyStatus.SUCCESS; - } - }); - - consumer.start(); - System.out.printf("Consumer Started.%n"); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java deleted file mode 100644 index 609aa62..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.ordermessage; - -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.producer.DefaultMQProducer; -import com.alibaba.rocketmq.client.producer.MQProducer; -import com.alibaba.rocketmq.client.producer.MessageQueueSelector; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingException; - -import java.io.UnsupportedEncodingException; -import java.util.List; - -public class Producer { - public static void main(String[] args) throws UnsupportedEncodingException { - try { - MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); - producer.start(); - - String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; - for (int i = 0; i < 100; i++) { - int orderId = i % 10; - Message msg = - new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, - ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); - SendResult sendResult = producer.send(msg, new MessageQueueSelector() { - @Override - public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { - Integer id = (Integer) arg; - int index = id % mqs.size(); - return mqs.get(index); - } - }, orderId); - - System.out.printf("%s%n", sendResult); - } - - producer.shutdown(); - } catch (MQClientException e) { - e.printStackTrace(); - } catch (RemotingException e) { - e.printStackTrace(); - } catch (MQBrokerException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java deleted file mode 100644 index adac497..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.quickstart; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.message.MessageExt; - -import java.util.List; - -public class Consumer { - - public static void main(String[] args) throws InterruptedException, MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); - - consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); - - consumer.subscribe("TopicTest", "*"); - - consumer.registerMessageListener(new MessageListenerConcurrently() { - - @Override - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { - System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - }); - - consumer.start(); - System.out.printf("Consumer Started.%n"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java deleted file mode 100644 index fb5dbea..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.quickstart; - -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.producer.DefaultMQProducer; -import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; -import com.alibaba.rocketmq.client.producer.LocalTransactionState; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; - -public class Producer { - public static void main(String[] args) throws MQClientException, InterruptedException { - DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); - producer.start(); - - for (int i = 0; i < 1000; i++) { - try { - Message msg = new Message("TopicTest", - "TagA", - ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) - ); - SendResult sendResult = producer.send(msg); - LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() { - @Override - public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { - return null; - } - }; - System.out.printf("%s%n", sendResult); - } catch (Exception e) { - e.printStackTrace(); - Thread.sleep(1000); - } - } - producer.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java deleted file mode 100644 index 1a8f07e..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.simple; - -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.producer.DefaultMQProducer; -import com.alibaba.rocketmq.client.producer.SendCallback; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; - -import java.io.UnsupportedEncodingException; - - -public class AsyncProducer { - public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { - - DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); - producer.start(); - producer.setRetryTimesWhenSendAsyncFailed(0); - - for (int i = 0; i < 10000000; i++) { - try { - final int index = i; - Message msg = new Message("Jodie_topic_1023", - "TagA", - "OrderID188", - "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); - producer.send(msg, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); - } - - @Override - public void onException(Throwable e) { - System.out.printf("%-10d Exception %s %n", index, e); - e.printStackTrace(); - } - }); - } catch (Exception e) { - e.printStackTrace(); - } - } - producer.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java deleted file mode 100644 index 7beb064..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.example.simple; - -import com.alibaba.rocketmq.common.message.MessageExt; - -import java.util.TreeMap; - - -public class CachedQueue { - private final TreeMap<Long, MessageExt> msgCachedTable = new TreeMap<Long, MessageExt>(); - - - public TreeMap<Long, MessageExt> getMsgCachedTable() { - return msgCachedTable; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java deleted file mode 100644 index e0010d4..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.simple; - -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.producer.DefaultMQProducer; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; - - -public class Producer { - public static void main(String[] args) throws MQClientException, InterruptedException { - - DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); - - producer.start(); - - for (int i = 0; i < 10000000; i++) - try { - { - Message msg = new Message("TopicTest", - "TagA", - "OrderID188", - "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); - SendResult sendResult = producer.send(msg); - System.out.printf("%s%n", sendResult); - } - - } catch (Exception e) { - e.printStackTrace(); - } - - producer.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java deleted file mode 100644 index 6245769..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.simple; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; -import com.alibaba.rocketmq.client.consumer.PullResult; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -public class PullConsumer { - private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); - - - public static void main(String[] args) throws MQClientException { - DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); - - consumer.start(); - - Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); - for (MessageQueue mq : mqs) { - System.out.printf("Consume from the queue: " + mq + "%n"); - SINGLE_MQ: - while (true) { - try { - PullResult pullResult = - consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); - System.out.printf("%s%n", pullResult); - putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); - switch (pullResult.getPullStatus()) { - case FOUND: - break; - case NO_MATCHED_MSG: - break; - case NO_NEW_MSG: - break SINGLE_MQ; - case OFFSET_ILLEGAL: - break; - default: - break; - } - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - consumer.shutdown(); - } - - private static long getMessageQueueOffset(MessageQueue mq) { - Long offset = OFFSE_TABLE.get(mq); - if (offset != null) - return offset; - - return 0; - } - - private static void putMessageQueueOffset(MessageQueue mq, long offset) { - OFFSE_TABLE.put(mq, offset); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java deleted file mode 100644 index 25d668c..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.simple; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; -import com.alibaba.rocketmq.client.consumer.PullResult; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.message.MessageQueue; - -public class PullConsumerTest { - public static void main(String[] args) throws MQClientException { - DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); - consumer.start(); - - try { - MessageQueue mq = new MessageQueue(); - mq.setQueueId(0); - mq.setTopic("TopicTest3"); - mq.setBrokerName("vivedeMacBook-Pro.local"); - - long offset = 26; - - long beginTime = System.currentTimeMillis(); - PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32); - System.out.printf("%s%n", System.currentTimeMillis() - beginTime); - System.out.printf("%s%n", pullResult); - } catch (Exception e) { - e.printStackTrace(); - } - - consumer.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java deleted file mode 100644 index 0c86cf8..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.example.simple; - -import com.alibaba.rocketmq.client.consumer.MQPullConsumer; -import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleService; -import com.alibaba.rocketmq.client.consumer.PullResult; -import com.alibaba.rocketmq.client.consumer.PullTaskCallback; -import com.alibaba.rocketmq.client.consumer.PullTaskContext; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; - - -public class PullScheduleService { - - public static void main(String[] args) throws MQClientException { - final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); - - scheduleService.setMessageModel(MessageModel.CLUSTERING); - scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() { - - @Override - public void doPullTask(MessageQueue mq, PullTaskContext context) { - MQPullConsumer consumer = context.getPullConsumer(); - try { - - long offset = consumer.fetchConsumeOffset(mq, false); - if (offset < 0) - offset = 0; - - PullResult pullResult = consumer.pull(mq, "*", offset, 32); - System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult); - switch (pullResult.getPullStatus()) { - case FOUND: - break; - case NO_MATCHED_MSG: - break; - case NO_NEW_MSG: - case OFFSET_ILLEGAL: - break; - default: - break; - } - consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); - - - context.setPullNextDelayTimeMillis(100); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - - scheduleService.start(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java deleted file mode 100644 index 5628ced..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.simple; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.message.MessageExt; - -import java.util.List; - - -public class PushConsumer { - - public static void main(String[] args) throws InterruptedException, MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); - consumer.subscribe("Jodie_topic_1023", "*"); - consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); - consumer.registerMessageListener(new MessageListenerConcurrently() { - - /** - - */ - @Override - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { - System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - }); - consumer.start(); - System.out.printf("Consumer Started.%n"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java deleted file mode 100644 index fc6bacd..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.example.simple; - -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - - -public class RandomAsyncCommit { - private final ConcurrentHashMap<MessageQueue, CachedQueue> mqCachedTable = - new ConcurrentHashMap<MessageQueue, CachedQueue>(); - - - public void putMessages(final MessageQueue mq, final List<MessageExt> msgs) { - CachedQueue cachedQueue = this.mqCachedTable.get(mq); - if (null == cachedQueue) { - cachedQueue = new CachedQueue(); - this.mqCachedTable.put(mq, cachedQueue); - } - for (MessageExt msg : msgs) { - cachedQueue.getMsgCachedTable().put(msg.getQueueOffset(), msg); - } - } - - - public void removeMessage(final MessageQueue mq, long offset) { - CachedQueue cachedQueue = this.mqCachedTable.get(mq); - if (null != cachedQueue) { - cachedQueue.getMsgCachedTable().remove(offset); - } - } - - - public long commitableOffset(final MessageQueue mq) { - CachedQueue cachedQueue = this.mqCachedTable.get(mq); - if (null != cachedQueue) { - return cachedQueue.getMsgCachedTable().firstKey(); - } - - return -1; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java deleted file mode 100644 index 68347a6..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.simple; - -import com.alibaba.rocketmq.client.QueryResult; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.producer.DefaultMQProducer; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; - -public class TestProducer { - public static void main(String[] args) throws MQClientException, InterruptedException { - DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); - producer.start(); - - for (int i = 0; i < 1; i++) - try { - { - Message msg = new Message("TopicTest1", - "TagA", - "key113", - "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); - SendResult sendResult = producer.send(msg); - System.out.printf("%s%n", sendResult); - - QueryResult queryMessage = - producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis()); - for (MessageExt m : queryMessage.getMessageList()) { - System.out.printf("%s%n", m); - } - } - - } catch (Exception e) { - e.printStackTrace(); - } - producer.shutdown(); - } -}