This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 52ffdc4 [tests] improve BatchMessageTest#testSimpleBatchProducerConsumer (#2629) 52ffdc4 is described below commit 52ffdc4ca7188271e98fe600ba82539c3dac6325 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Sat Sep 22 11:56:06 2018 -0700 [tests] improve BatchMessageTest#testSimpleBatchProducerConsumer (#2629) *Motivation* The batching behavior in BatchMessageTest#testSimpleBatchProducerConsumer is non-deterministic. *Changes* Disabled time based and size based batch and explicitly flush messages to ensure tests are running in a deterministic way. --- .../apache/pulsar/broker/service/BatchMessageTest.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 5be8b15..1db8ed8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -41,7 +41,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; @@ -249,14 +248,24 @@ public class BatchMessageTest extends BrokerTestBase { .subscribe(); consumer.close(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType) - .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true) - .create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .compressionType(compressionType) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + // disabled time based batch by setting delay to a large enough value + .batchingMaxPublishDelay(60, TimeUnit.HOURS) + // disabled size based batch + .batchingMaxMessages(2 * numMsgs) + .enableBatching(true) + .create(); List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList(); for (int i = 0; i < numMsgs; i++) { byte[] message = ("msg-" + i).getBytes(); sendFutureList.add(producer.sendAsync(message)); + if ((i + 1) % numMsgsInBatch == 0) { + producer.flush(); + LOG.info("Flush {} messages", (i + 1)); + } } FutureUtil.waitForAll(sendFutureList).get();