This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 52ffdc4  [tests] improve 
BatchMessageTest#testSimpleBatchProducerConsumer (#2629)
52ffdc4 is described below

commit 52ffdc4ca7188271e98fe600ba82539c3dac6325
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Sat Sep 22 11:56:06 2018 -0700

    [tests] improve BatchMessageTest#testSimpleBatchProducerConsumer (#2629)
    
    *Motivation*
    
    The batching behavior in BatchMessageTest#testSimpleBatchProducerConsumer 
is non-deterministic.
    
    *Changes*
    
    Disabled time based and size based batch and explicitly flush messages to 
ensure tests are running in a deterministic way.
---
 .../apache/pulsar/broker/service/BatchMessageTest.java  | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 5be8b15..1db8ed8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -41,7 +41,6 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
@@ -249,14 +248,24 @@ public class BatchMessageTest extends BrokerTestBase {
                 .subscribe();
         consumer.close();
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
-                .batchingMaxPublishDelay(5, 
TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .compressionType(compressionType)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            // disabled time based batch by setting delay to a large enough 
value
+            .batchingMaxPublishDelay(60, TimeUnit.HOURS)
+            // disabled size based batch
+            .batchingMaxMessages(2 * numMsgs)
+            .enableBatching(true)
+            .create();
 
         List<CompletableFuture<MessageId>> sendFutureList = 
Lists.newArrayList();
         for (int i = 0; i < numMsgs; i++) {
             byte[] message = ("msg-" + i).getBytes();
             sendFutureList.add(producer.sendAsync(message));
+            if ((i + 1) % numMsgsInBatch == 0) {
+                producer.flush();
+                LOG.info("Flush {} messages", (i + 1));
+            }
         }
         FutureUtil.waitForAll(sendFutureList).get();
 

Reply via email to