sijie closed pull request #1440: Update default values for a few publisher 
settings
URL: https://github.com/apache/incubator-pulsar/pull/1440
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index d3d15d926..ff85e3436 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -28,6 +28,7 @@
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
@@ -74,7 +75,10 @@ public AbstractReplicator(String topicName, String 
replicatorPrefix, String loca
         this.producerQueueSize = 
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
 
         this.producerBuilder = client.newProducer() //
-                .topic(topicName).sendTimeout(0, TimeUnit.SECONDS) //
+                .topic(topicName)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .enableBatching(false)
+                .sendTimeout(0, TimeUnit.SECONDS) //
                 .maxPendingMessages(producerQueueSize) //
                 .producerName(getReplicatorName(replicatorPrefix, 
localCluster));
         STATE_UPDATER.set(this, State.Stopped);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 14fc41b56..2524fa2ce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -640,7 +640,11 @@ public void namespaces() throws PulsarAdminException, 
PulsarServerException, Exc
         assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), new 
PersistencePolicies(3, 2, 1, 10.0));
 
         // Force topic creation and namespace being loaded
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/my-topic").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/ns1/my-topic")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.close();
         admin.persistentTopics().delete("persistent://prop-xyz/ns1/my-topic");
 
@@ -804,8 +808,11 @@ public void partitionedTopics(String topicName) throws 
Exception {
         admin.persistentTopics().deleteSubscription(partitionedTopicName, 
"my-sub-1");
         
assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), 
Lists.newArrayList("my-sub"));
 
-        Producer<byte[]> producer = 
client.newProducer().topic(partitionedTopicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic(partitionedTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
@@ -859,7 +866,11 @@ public void partitionedTopics(String topicName) throws 
Exception {
         } catch (ConflictException ce) {
         }
 
-        producer = client.newProducer().topic(partitionedTopicName).create();
+        producer = client.newProducer()
+            .topic(partitionedTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         topics = admin.persistentTopics().getList("prop-xyz/ns1");
         assertEquals(topics.size(), 4);
@@ -918,7 +929,11 @@ public void testNamespaceSplitBundle() throws Exception {
         // Force to create a topic
         final String namespace = "prop-xyz/ns1";
         final String topicName = (new 
StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.send("message".getBytes());
         publishMessagesOnPersistentTopic(topicName, 0);
         assertEquals(admin.persistentTopics().getList(namespace), 
Lists.newArrayList(topicName));
@@ -944,7 +959,11 @@ public void testNamespaceSplitBundleConcurrent() throws 
Exception {
         // Force to create a topic
         final String namespace = "prop-xyz/ns1";
         final String topicName = (new 
StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.send("message".getBytes());
         publishMessagesOnPersistentTopic(topicName, 0);
         assertEquals(admin.persistentTopics().getList(namespace), 
Lists.newArrayList(topicName));
@@ -1037,7 +1056,11 @@ public void testNamespaceUnloadBundle() throws Exception 
{
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/ds2").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/ns1/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1095,7 +1118,11 @@ public void testNamespaceBundleUnload(Integer 
numBundles) throws Exception {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds2").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/ns1-bundles/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1148,7 +1175,11 @@ public void testClearBacklogOnNamespace(Integer 
numBundles) throws Exception {
                 .subscribe();
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds2").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/ns1-bundles/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1157,7 +1188,11 @@ public void testClearBacklogOnNamespace(Integer 
numBundles) throws Exception {
         producer.close();
 
         // Create producer
-        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds1").create();
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/ns1-bundles/ds1")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer1.send(message.getBytes());
@@ -1251,7 +1286,11 @@ private void publishMessagesOnPersistentTopic(String 
topicName, int messages) th
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int 
messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -1296,7 +1335,11 @@ public void statsOnNonExistingTopics() throws Exception {
     @Test
     public void testDeleteFailedReturnCode() throws Exception {
         String topicName = "persistent://prop-xyz/ns1/my-topic";
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         try {
             admin.persistentTopics().delete(topicName);
@@ -1684,8 +1727,11 @@ public void 
testPersistentTopicExpireMessageOnParitionTopic() throws Exception {
         Consumer<byte[]> consumer = 
client.newConsumer().topic("persistent://prop-xyz/ns1/ds1")
                 .subscriptionName("my-sub").subscribe();
 
-        Producer<byte[]> producer = 
client.newProducer().topic("persistent://prop-xyz/ns1/ds1")
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic("persistent://prop-xyz/ns1/ds1")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index c53396a39..ef23f1973 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -173,8 +173,11 @@ public void testIncrementPartitionsOfTopic() throws 
Exception {
                 .toString();
 
         // (3) produce messages to all partitions including newly created 
partitions (RoundRobin)
-        Producer<byte[]> producer = 
client.newProducer().topic(partitionedTopicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic(partitionedTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
         final int totalMessages = newPartitions * 2;
         for (int i = 0; i < totalMessages; i++) {
             String message = "message-" + i;
@@ -268,7 +271,11 @@ public void nonPersistentTopics() throws Exception {
     }
 
     private void publishMessagesOnTopic(String topicName, int messages, int 
startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -331,7 +338,11 @@ public void 
testUpdatePersistencePolicyUpdateManagedCursor() throws Exception {
         admin.namespaces().setPersistence(namespace, new 
PersistencePolicies(3, 3, 3, 50.0));
         assertEquals(admin.namespaces().getPersistence(namespace), new 
PersistencePolicies(3, 3, 3, 50.0));
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
 
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -381,7 +392,7 @@ public void testUnloadTopic(final String topicType) throws 
Exception {
         
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
 
         // recreation of producer will load the topic again
-        producer = pulsarClient.newProducer().topic(topicName).create();
+        pulsarClient.newProducer().topic(topicName).create();
         topic = pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topic);
         // unload the topic
@@ -502,7 +513,11 @@ public void testResetCursorOnPosition(String 
namespaceName) throws Exception {
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int 
messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -741,7 +756,12 @@ public void testPublishConsumerStats() throws Exception {
         PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
         Consumer<byte[]> consumer = 
client.newConsumer().topic(topic).subscriptionName(subscriberName)
                 .subscriptionType(SubscriptionType.Shared).subscribe();
-        Producer<byte[]> producer = 
client.newProducer().topic(topic).producerName(producerName).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .producerName(producerName)
+            .create();
 
         retryStrategically((test) -> {
             PersistentTopicStats stats;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 7428afd87..f3762ac9d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -805,8 +805,11 @@ public void partitionedTopics(String topicName) throws 
Exception {
         admin.persistentTopics().deleteSubscription(partitionedTopicName, 
"my-sub-1");
         
assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), 
Lists.newArrayList("my-sub"));
 
-        Producer<byte[]> producer = 
client.newProducer().topic(partitionedTopicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic(partitionedTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
@@ -860,7 +863,11 @@ public void partitionedTopics(String topicName) throws 
Exception {
         } catch (ConflictException ce) {
         }
 
-        producer = client.newProducer().topic(partitionedTopicName).create();
+        producer = client.newProducer()
+            .topic(partitionedTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         topics = admin.persistentTopics().getList("prop-xyz/use/ns1");
         assertEquals(topics.size(), 4);
@@ -918,7 +925,11 @@ public void testNamespaceSplitBundle() throws Exception {
         // Force to create a topic
         final String namespace = "prop-xyz/use/ns1";
         final String topicName = (new 
StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.send("message".getBytes());
         publishMessagesOnPersistentTopic(topicName, 0);
         assertEquals(admin.persistentTopics().getList(namespace), 
Lists.newArrayList(topicName));
@@ -944,7 +955,11 @@ public void testNamespaceSplitBundleConcurrent() throws 
Exception {
         // Force to create a topic
         final String namespace = "prop-xyz/use/ns1";
         final String topicName = (new 
StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.send("message".getBytes());
         publishMessagesOnPersistentTopic(topicName, 0);
         assertEquals(admin.persistentTopics().getList(namespace), 
Lists.newArrayList(topicName));
@@ -1064,7 +1079,11 @@ public void testNamespaceUnloadBundle() throws Exception 
{
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1/ds2").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/use/ns1/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1121,8 +1140,11 @@ public void testNamespaceBundleUnload(Integer 
numBundles) throws Exception {
                 Lists.newArrayList("my-sub"));
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds2")
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/use/ns1-bundles/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1174,8 +1196,11 @@ public void testClearBacklogOnNamespace(Integer 
numBundles) throws Exception {
                 .subscribe();
 
         // Create producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds2")
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/use/ns1-bundles/ds2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
@@ -1184,8 +1209,11 @@ public void testClearBacklogOnNamespace(Integer 
numBundles) throws Exception {
         producer.close();
 
         // Create producer
-        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds1")
-                .create();
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+            .topic("persistent://prop-xyz/use/ns1-bundles/ds1")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer1.send(message.getBytes());
@@ -1278,7 +1306,11 @@ private void publishMessagesOnPersistentTopic(String 
topicName, int messages) th
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int 
messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -1323,7 +1355,11 @@ public void statsOnNonExistingTopics() throws Exception {
     @Test
     public void testDeleteFailedReturnCode() throws Exception {
         String topicName = "persistent://prop-xyz/use/ns1/my-topic";
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         try {
             admin.persistentTopics().delete(topicName);
@@ -1711,8 +1747,11 @@ public void 
testPersistentTopicExpireMessageOnParitionTopic() throws Exception {
         Consumer<byte[]> consumer = 
client.newConsumer().topic("persistent://prop-xyz/use/ns1/ds1")
                 .subscriptionName("my-sub").subscribe();
 
-        Producer<byte[]> producer = 
client.newProducer().topic("persistent://prop-xyz/use/ns1/ds1")
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic("persistent://prop-xyz/use/ns1/ds1")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
         for (int i = 0; i < 10; i++) {
             String message = "message-" + i;
             producer.send(message.getBytes());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
index 55b2b7b0f..781a932a4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
@@ -167,7 +167,9 @@ public void testIncrementPartitionsOfTopic() throws 
Exception {
 
         // (3) produce messages to all partitions including newly created 
partitions (RoundRobin)
         Producer<byte[]> producer = 
client.newProducer().topic(partitionedTopicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
         final int totalMessages = newPartitions * 2;
         for (int i = 0; i < totalMessages; i++) {
             String message = "message-" + i;
@@ -261,7 +263,11 @@ public void nonPersistentTopics() throws Exception {
     }
 
     private void publishMessagesOnTopic(String topicName, int messages, int 
startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -324,7 +330,11 @@ public void 
testUpdatePersistencePolicyUpdateManagedCursor() throws Exception {
         admin.namespaces().setPersistence(namespace, new 
PersistencePolicies(3, 3, 3, 50.0));
         assertEquals(admin.namespaces().getPersistence(namespace), new 
PersistencePolicies(3, 3, 3, 50.0));
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
 
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -495,7 +505,11 @@ public void testResetCursorOnPosition(String 
namespaceName) throws Exception {
     }
 
     private void publishMessagesOnPersistentTopic(String topicName, int 
messages, int startIdx) throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             String message = "message-" + i;
@@ -708,7 +722,11 @@ public void testNonPersistentTopics() throws Exception {
         Set<String> topicNames = Sets.newHashSet();
         for (int i = 0; i < totalTopics; i++) {
             topicNames.add(topicName + i);
-            Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName + i).create();
+            Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName + i)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
             producer.close();
         }
 
@@ -735,7 +753,12 @@ public void testPublishConsumerStats() throws Exception {
         PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
         Consumer<byte[]> consumer = 
client.newConsumer().topic(topic).subscriptionName(subscriberName)
                 .subscriptionType(SubscriptionType.Shared).subscribe();
-        Producer<byte[]> producer = 
client.newProducer().topic(topic).producerName(producerName).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic(topic)
+            .producerName(producerName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         retryStrategically((test) -> {
             PersistentTopicStats stats;
@@ -777,8 +800,11 @@ public void testTenantNameWithUnderscore() throws 
Exception {
 
         String topic = "persistent://prop_xyz/use/my-namespace/my-topic";
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopicStats stats = admin.persistentTopics().getStats(topic);
         assertEquals(stats.publishers.size(), 1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 9f9ab3549..87187c648 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -43,6 +43,7 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -530,10 +531,16 @@ public void testBatchAndNonBatchCumulativeAcks() throws 
Exception {
         consumer.close();
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                .batchingMaxPublishDelay(5, 
TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
-                .create();
+            .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
+            .batchingMaxMessages(numMsgsInBatch)
+            .enableBatching(true)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         // create producer to publish non batch messages
-        Producer<byte[]> noBatchProducer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> noBatchProducer = 
pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         List<CompletableFuture<MessageId>> sendFutureList = 
Lists.newArrayList();
         for (int i = 0; i < numMsgs / 2; i++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 53c49d7bf..bc5c210b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -159,7 +159,10 @@ public void testSimpleConsumerEventsWithoutPartition() 
throws Exception {
         assertEquals(subRef.getDispatcher().getType(), SubType.Failover);
 
         List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs);
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "my-message-" + i;
             futures.add(producer.sendAsync(message.getBytes()));
@@ -354,7 +357,8 @@ public void testSimpleConsumerEventsWithPartition() throws 
Exception {
 
         List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs);
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "my-message-" + i;
             futures.add(producer.sendAsync(message.getBytes()));
@@ -545,7 +549,10 @@ public void testActiveConsumerFailoverWithDelay() throws 
Exception {
 
         // enqueue messages
         List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs);
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "my-message-" + i;
             futures.add(producer.sendAsync(message.getBytes()));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 2bc152a2d..4b659f45d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -39,6 +39,7 @@
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -99,7 +100,10 @@ public void testSimpleConsumerEvents() throws Exception {
         assertEquals(subRef.getDispatcher().getType(), SubType.Shared);
 
         List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs * 2);
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs * 2; i++) {
             String message = "my-message-" + i;
             futures.add(producer.sendAsync(message.getBytes()));
@@ -256,8 +260,11 @@ public void testConsumersWithDifferentPermits() throws 
Exception {
                 }).subscribe();
 
         List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs);
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).maxPendingMessages(numMsgs + 1)
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .maxPendingMessages(numMsgs + 1)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "msg-" + i;
             futures.add(producer.sendAsync(message.getBytes()));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index c04e3bd10..8da33cd3d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -53,6 +53,7 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -72,6 +73,7 @@
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
+import org.testng.annotations.Ignore;
 import org.testng.annotations.Test;
 
 /**
@@ -95,7 +97,11 @@ public void testSimpleProducerEvents() throws Exception {
         final String topicName = "persistent://prop/ns-abc/topic0";
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -136,7 +142,11 @@ public void testSimpleConsumerEvents() throws Exception {
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
         assertEquals(getAvailablePermits(subRef), 1000 /* default */);
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs * 2; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -210,7 +220,11 @@ public void testConsumerFlowControl() throws Exception {
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                 .receiverQueueSize(recvQueueSize).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -254,7 +268,11 @@ public void testActiveSubscriptionWithCache() throws 
Exception {
         // (1) Create subscription
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                 .receiverQueueSize(recvQueueSize).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // (2) Produce Messages
         for (int i = 0; i < recvQueueSize / 2; i++) {
@@ -325,7 +343,11 @@ public Void call() throws Exception {
             });
         }
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < recvQueueSize * numConsumersThreads; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -352,7 +374,11 @@ public void testGracefulClose() throws Exception {
         final String topicName = "persistent://prop/ns-abc/topic4";
         final String subName = "sub4";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
@@ -415,7 +441,11 @@ public void testSimpleCloseTopic() throws Exception {
         final String subName = "sub5";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -443,7 +473,11 @@ public void testSingleClientMultipleSubscriptions() throws 
Exception {
         final String subName = "sub6";
 
         
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
-        pulsarClient.newProducer().topic(topicName).create();
+        pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         try {
             
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
             fail("Should have thrown an exception since one consumer is 
already connected");
@@ -676,7 +710,11 @@ public void testMessageExpiry() throws Exception {
         consumer.close();
         assertFalse(subRef.getDispatcher().isConsumerConnected());
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -719,7 +757,11 @@ public void testMessageExpiryWithFewExpiredBacklog() 
throws Exception {
 
         assertTrue(subRef.getDispatcher().isConsumerConnected());
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < numMsgs; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -820,7 +862,11 @@ public void testReceiveWithTimeout() throws Exception {
 
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName(subName).receiverQueueSize(1000).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         assertEquals(consumer.getAvailablePermits(), 0);
 
@@ -847,7 +893,11 @@ public void testProducerReturnedMessageId() throws 
Exception {
         final String topicName = "persistent://prop/ns-abc/topic-xyz";
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -896,11 +946,17 @@ public void testProducerQueueFullBlocking() throws 
Exception {
         PulsarClient client = 
PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
 
         // 1. Producer connect
-        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
client.newProducer().topic(topicName)
-                
.maxPendingMessages(messages).blockIfQueueFull(true).sendTimeout(1, 
TimeUnit.SECONDS).create();
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
client.newProducer()
+            .topic(topicName)
+            .maxPendingMessages(messages)
+            .blockIfQueueFull(true)
+            .sendTimeout(1, TimeUnit.SECONDS)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Stop broker
-        cleanup();
+        super.internalCleanup();
 
         // 2. producer publish messages
         long startTime = System.nanoTime();
@@ -937,11 +993,17 @@ public void testProducerQueueFullNonBlocking() throws 
Exception {
 
         // 1. Producer connect
         PulsarClient client = 
PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
-        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
client.newProducer().topic(topicName)
-                
.maxPendingMessages(messages).blockIfQueueFull(false).sendTimeout(1, 
TimeUnit.SECONDS).create();
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
client.newProducer()
+            .topic(topicName)
+            .maxPendingMessages(messages)
+            .blockIfQueueFull(false)
+            .sendTimeout(1, TimeUnit.SECONDS)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Stop broker
-        cleanup();
+        super.internalCleanup();
 
         // 2. producer publish messages
         long startTime = System.nanoTime();
@@ -980,9 +1042,16 @@ public void testDeleteTopics() throws Exception {
         BrokerService brokerService = pulsar.getBrokerService();
 
         // 1. producers connect
-        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-1").create();
-        /* Producer<byte[]> producer2 = */ 
pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-2")
-                .create();
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+            .topic("persistent://prop/ns-abc/topic-1")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+        /* Producer<byte[]> producer2 = */ pulsarClient.newProducer()
+            .topic("persistent://prop/ns-abc/topic-2")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         brokerService.updateRates();
 
@@ -1021,8 +1090,12 @@ public void testCompression(CompressionType 
compressionType) throws Exception {
         final String topicName = "persistent://prop/ns-abc/topic0" + 
compressionType;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .compressionType(compressionType)
+            .create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
@@ -1057,7 +1130,11 @@ public void testBrokerTopicStats() throws Exception {
         statsUpdater.shutdown();
 
         final String namespace = "prop/ns-abc";
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://" + namespace + 
"/topic0").create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://" + namespace + "/topic0")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         // 1. producer publish messages
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -1086,7 +1163,10 @@ public void testPayloadCorruptionDetection() throws 
Exception {
         final String topicName = "persistent://prop/ns-abc/topic1";
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
 
         Message<byte[]> msg1 = 
MessageBuilder.create().setContent("message-1".getBytes()).build();
@@ -1141,7 +1221,11 @@ public void testMessageRedelivery() throws Exception {
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                 .subscriptionType(SubscriptionType.Shared).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // (1) Produce messages
         for (int i = 0; i < totalMessages; i++) {
@@ -1196,7 +1280,11 @@ public void testMessageReplay() throws Exception {
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                 
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(1).subscribe();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -1258,8 +1346,11 @@ public void testMessageReplay() throws Exception {
     public void testCreateProducerWithSameName() throws Exception {
         String topic = 
"persistent://prop/ns-abc/testCreateProducerWithSameName";
 
-        ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topic)
-                .producerName("test-producer-a");
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+            .topic(topic)
+            .producerName("test-producer-a")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition);
         Producer<byte[]> p1 = producerBuilder.create();
 
         try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 50dfc3c00..8d2804f5f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -52,6 +52,7 @@
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawMessage;
@@ -209,7 +210,10 @@ public void testConcurrentReplicator() throws Exception {
         final TopicName topicName = 
TopicName.get(String.format("persistent://" + namespace + "/topic-%d", 0));
         PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
                 .build();
-        Producer<byte[]> producer = 
client1.newProducer().topic(topicName.toString()).create();
+        Producer<byte[]> producer = 
client1.newProducer().topic(topicName.toString())
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.close();
 
         PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getOrCreateTopic(topicName.toString()).get();
@@ -850,7 +854,10 @@ public void verifyChecksumAfterReplication() throws 
Exception {
         final String topicName = 
"persistent://pulsar/ns/checksumAfterReplication";
 
         PulsarClient c1 = 
PulsarClient.builder().serviceUrl(url1.toString()).build();
-        Producer<byte[]> p1 = c1.newProducer().topic(topicName).create();
+        Producer<byte[]> p1 = c1.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PulsarClient c2 = 
PulsarClient.builder().serviceUrl(url2.toString()).build();
         RawReader reader2 = RawReader.create(c2, topicName, "sub").get();
@@ -897,7 +904,10 @@ public void testReplicatorOnPartitionedTopic(boolean 
isPartitionedTopic) throws
 
         // load namespace with dummy topic on ns
         PulsarClient client = 
PulsarClient.builder().serviceUrl(url1.toString()).build();
-        client.newProducer().topic("persistent://" + namespace + 
"/dummyTopic").create();
+        client.newProducer().topic("persistent://" + namespace + "/dummyTopic")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // persistent topic test
         try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 5f5432cd1..f834a2415 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -37,6 +37,7 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -267,7 +268,11 @@ void shutdown() throws Exception {
             this.namespace = dest.getNamespace();
             this.topicName = dest.toString();
             client = 
PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, 
TimeUnit.SECONDS).build();
-            producer = client.newProducer().topic(topicName).create();
+            producer = client.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
         }
 
@@ -276,12 +281,11 @@ void shutdown() throws Exception {
             this.namespace = dest.getNamespace();
             this.topicName = dest.toString();
             client = 
PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, 
TimeUnit.SECONDS).build();
-            ProducerBuilder<byte[]> producerBuilder = 
client.newProducer().topic(topicName);
-            if (batch) {
-                producerBuilder.enableBatching(true);
-                producerBuilder.batchingMaxPublishDelay(1, TimeUnit.SECONDS);
-                producerBuilder.batchingMaxMessages(5);
-            }
+            ProducerBuilder<byte[]> producerBuilder = client.newProducer()
+                .topic(topicName)
+                .enableBatching(batch)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+                .batchingMaxMessages(5);
             producer = producerBuilder.create();
 
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index 2a590eb95..c5ab30bc0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -73,7 +73,10 @@ public void testExclusiveSingleAckedNormalTopic() throws 
Exception {
         HashSet<String> messageDataHashSet = new HashSet<String>();
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -159,7 +162,10 @@ public void testSharedSingleAckedNormalTopic() throws 
Exception {
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 10;
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
@@ -239,7 +245,10 @@ public void testFailoverSingleAckedNormalTopic() throws 
Exception {
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 10;
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
@@ -355,7 +364,10 @@ public void testExclusiveCumulativeAckedNormalTopic() 
throws Exception {
         final int totalMessages = 10;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
@@ -414,7 +426,9 @@ public void testExclusiveSingleAckedPartitionedTopic() 
throws Exception {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -468,7 +482,8 @@ public void testSharedSingleAckedPartitionedTopic() throws 
Exception {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         // 2. Create consumer
         ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topicName)
@@ -564,7 +579,9 @@ public void testFailoverSingleAckedPartitionedTopic() 
throws Exception {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
         ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topicName)
@@ -645,7 +662,10 @@ public void testFailoverInactiveConsumer() throws 
Exception {
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 10;
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index 549d2b79c..9bc273064 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -38,6 +38,7 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
@@ -65,7 +66,10 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testSimpleTermination() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         /* MessageId msgId1 = */producer.send("test-msg-1".getBytes());
         /* MessageId msgId2 = */producer.send("test-msg-2".getBytes());
@@ -84,7 +88,10 @@ public void testSimpleTermination() throws Exception {
 
     @Test
     public void testCreateProducerOnTerminatedTopic() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         /* MessageId msgId1 = */producer.send("test-msg-1".getBytes());
         /* MessageId msgId2 = */producer.send("test-msg-2".getBytes());
@@ -103,7 +110,10 @@ public void testCreateProducerOnTerminatedTopic() throws 
Exception {
 
     @Test(timeOut = 20000)
     public void testTerminateWhilePublishing() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         CyclicBarrier barrier = new CyclicBarrier(2);
         List<CompletableFuture<MessageId>> futures = new ArrayList<>();
@@ -148,7 +158,10 @@ public void testTerminateWhilePublishing() throws 
Exception {
 
     @Test
     public void testDoubleTerminate() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         /* MessageId msgId1 = */producer.send("test-msg-1".getBytes());
         /* MessageId msgId2 = */producer.send("test-msg-2".getBytes());
@@ -176,7 +189,10 @@ public void testTerminatePartitionedTopic() throws 
Exception {
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationConsumer() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-sub").subscribe();
 
@@ -211,7 +227,10 @@ public void testSimpleTerminationConsumer() throws 
Exception {
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationMessageListener() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         CountDownLatch latch = new CountDownLatch(1);
 
@@ -248,7 +267,10 @@ public void reachedEndOfTopic(Consumer<byte[]> consumer) {
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationReader() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         MessageId msgId1 = producer.send("test-msg-1".getBytes());
         MessageId msgId2 = producer.send("test-msg-2".getBytes());
@@ -277,7 +299,10 @@ public void testSimpleTerminationReader() throws Exception 
{
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationReaderListener() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         CountDownLatch latch = new CountDownLatch(1);
 
@@ -311,7 +336,10 @@ public void reachedEndOfTopic(Reader<byte[]> reader) {
 
     @Test(timeOut = 20000)
     public void testSubscribeOnTerminatedTopic() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         /* MessageId msgId1 = */ producer.send("test-msg-1".getBytes());
         MessageId msgId2 = producer.send("test-msg-2".getBytes());
 
@@ -327,7 +355,10 @@ public void testSubscribeOnTerminatedTopic() throws 
Exception {
 
     @Test(timeOut = 20000)
     public void testSubscribeOnTerminatedTopicWithNoMessages() throws 
Exception {
-        pulsarClient.newProducer().topic(topicName).create();
+        pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         admin.persistentTopics().terminateTopicAsync(topicName).get();
 
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 9b6281935..d23c49235 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -337,7 +337,8 @@ public void testPartitionTopicLookup() throws Exception {
         /**** broker-2 started ****/
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
                 .subscriptionName("my-partitioned-subscriber").subscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index e0c46acfa..aba0e1b4d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -600,7 +600,9 @@ public void testBrokerSubscriptionRecovery(boolean 
unloadBundleGracefully) throw
                 
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, 
TimeUnit.SECONDS).subscribe();
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
-                .create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         CountDownLatch latch = new CountDownLatch(totalProducedMsgs);
         // (1) Produced Messages
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 52deb52f1..553eec89c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -147,7 +147,10 @@ public void 
testPartitionedNonPersistentTopic(SubscriptionType type) throws Exce
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("subscriber-1")
                 .subscriptionType(type).subscribe();
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         int totalProduceMsg = 500;
         for (int i = 0; i < totalProduceMsg; i++) {
@@ -191,7 +194,10 @@ public void 
testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type
         Consumer<byte[]> consumer = 
client.newConsumer().topic(topic).subscriptionName("subscriber-1")
                 .subscriptionType(type).subscribe();
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // Ensure all partitions exist
         for (int i = 0; i < numPartitions; i++) {
@@ -493,7 +499,10 @@ public void testReplicator() throws Exception {
             ConsumerImpl<byte[]> repl3Consumer = (ConsumerImpl<byte[]>) 
client3.newConsumer().topic(globalTopicName)
                     .subscriptionName("subscriber-1").subscribe();
 
-            Producer<byte[]> producer = 
client1.newProducer().topic(globalTopicName).create();
+            Producer<byte[]> producer = 
client1.newProducer().topic(globalTopicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
             Thread.sleep(timeWaitToSync);
 
@@ -778,7 +787,10 @@ public void testMsgDropStat() throws Exception {
             Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2")
                     
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();
 
-            ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName).create();
+            ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
             String firstTimeConnected = producer.getConnectedSince();
             ExecutorService executor = Executors.newFixedThreadPool(5);
             byte[] msgData = "testData".getBytes();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 583026585..247a7734b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -73,7 +73,8 @@ public void testRoundRobinProducer() throws Exception {
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), 
numPartitions);
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
                 .subscriptionName("my-partitioned-subscriber").subscribe();
@@ -250,7 +251,10 @@ public void testInvalidSequence() throws Exception {
             // ok
         }
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString()).create();
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         producer.close();
 
         try {
@@ -289,7 +293,10 @@ public void testSillyUser() throws Exception {
         }
 
         try {
-            producer = 
pulsarClient.newProducer().topic(topicName.toString()).create();
+            producer = pulsarClient.newProducer().topic(topicName.toString())
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
             consumer = 
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe();
             producer.send("message1".getBytes());
             producer.send("message2".getBytes());
@@ -343,7 +350,8 @@ public void testAsyncPartitionedProducerConsumer() throws 
Exception {
 
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), 
numPartitions);
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
                 
.subscriptionName("my-partitioned-subscriber").subscriptionType(SubscriptionType.Shared).subscribe();
@@ -391,7 +399,8 @@ public void 
testAsyncPartitionedProducerConsumerQueueSizeOne() throws Exception
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), 
numPartitions);
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
                 
.subscriptionName("my-partitioned-subscriber").receiverQueueSize(1).subscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 8a536f2a3..fb2c2b40a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -130,10 +130,13 @@ public void testAsyncProducerAndAsyncAck(int 
batchMessageDelayMs, int ackTimeout
         Consumer<byte[]> consumer = consumerBuilder.subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/tp1/my-ns/my-topic2");
+                .topic("persistent://my-property/tp1/my-ns/my-topic2")
+                .messageRoutingMode(MessageRoutingMode.SinglePartition);
         if (batchMessageDelayMs != 0) {
             
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs,
 TimeUnit.MILLISECONDS)
                     .batchingMaxMessages(5);
+        } else {
+            producerBuilder.enableBatching(false);
         }
 
         Producer<byte[]> producer = producerBuilder.create();
@@ -185,10 +188,13 @@ public void 
testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs,
         Consumer<byte[]> consumer = consumerBuilder.subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/tp1/my-ns/my-topic2");
+                .topic("persistent://my-property/tp1/my-ns/my-topic2")
+                .messageRoutingMode(MessageRoutingMode.SinglePartition);
         if (batchMessageDelayMs != 0) {
             
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs,
 TimeUnit.MILLISECONDS)
                     .batchingMaxMessages(5);
+        } else {
+            producerBuilder.enableBatching(false);
         }
 
         Producer<byte[]> producer = producerBuilder.create();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index b5dcd87dd..ad406ce2c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -591,16 +591,22 @@ public void testSendBigMessageSizeButCompressed() throws 
Exception {
         final String topic = "persistent://my-property/my-ns/bigMsg";
 
         // (a) non-batch msg with compression
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4)
-                .create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .compressionType(CompressionType.LZ4)
+            .create();
         Message<byte[]> message = MessageBuilder.create().setContent(new 
byte[PulsarDecoder.MaxMessageSize + 1])
                 .build();
         producer.send(message);
         producer.close();
 
         // (b) batch-msg
-        producer = 
pulsarClient.newProducer().topic(topic).enableBatching(true).compressionType(CompressionType.LZ4)
-                .create();
+        producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(true)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .compressionType(CompressionType.LZ4)
+            .create();
         message = MessageBuilder.create().setContent(new 
byte[PulsarDecoder.MaxMessageSize + 1]).build();
         try {
             producer.send(message);
@@ -611,7 +617,11 @@ public void testSendBigMessageSizeButCompressed() throws 
Exception {
         producer.close();
 
         // (c) non-batch msg without compression
-        producer = 
pulsarClient.newProducer().topic(topic).compressionType(CompressionType.NONE).create();
+        producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .compressionType(CompressionType.NONE)
+            .create();
         message = MessageBuilder.create().setContent(new 
byte[PulsarDecoder.MaxMessageSize + 1]).build();
         try {
             producer.send(message);
@@ -622,7 +632,11 @@ public void testSendBigMessageSizeButCompressed() throws 
Exception {
         producer.close();
 
         // (d) non-batch msg with compression and try to consume message
-        producer = 
pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4).create();
+        producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .compressionType(CompressionType.LZ4).create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe();
         byte[] content = new byte[PulsarDecoder.MaxMessageSize + 10];
         message = MessageBuilder.create().setContent(content).build();
@@ -911,7 +925,9 @@ public void testSendCallBack() throws Exception {
 
         final int totalMsg = 100;
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
-                .create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         for (int i = 0; i < totalMsg; i++) {
             final String message = "my-message-" + i;
             Message<byte[]> msg = 
MessageBuilder.create().setContent(message.getBytes()).build();
@@ -1294,7 +1310,10 @@ public void 
testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio
                     
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, 
TimeUnit.SECONDS).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    
.topic("persistent://my-property/my-ns/unacked-topic").create();
+                .topic("persistent://my-property/my-ns/unacked-topic")
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1423,6 +1442,8 @@ public void testUnackedBlockAtBatch(int 
batchMessageDelayMs) throws Exception {
                 producerBuidler.enableBatching(true);
                 producerBuidler.batchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
                 producerBuidler.batchingMaxMessages(5);
+            } else {
+                producerBuidler.enableBatching(false);
             }
 
             Producer<byte[]> producer = producerBuidler.create();
@@ -1868,7 +1889,9 @@ public void testSharedSamePriorityConsumer() throws 
Exception {
         Consumer<byte[]> c1 = consumerBuilder.subscribe();
         Consumer<byte[]> c2 = consumerBuilder.subscribe();
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
-                .create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         List<Future<MessageId>> futures = Lists.newArrayList();
 
         // Asynchronously produce messages
@@ -2274,8 +2297,12 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> keyMe
 
         // 2. Producer with valid key name
         Producer<byte[]> producer = pulsarClient.newProducer()
-                
.topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-rsa.pem")
-                .cryptoKeyReader(new EncKeyReader()).create();
+            .topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+            .addEncryptionKey("client-rsa.pem")
+            .cryptoKeyReader(new EncKeyReader())
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 80ac52068..36238ba77 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -122,6 +122,8 @@ public void testSyncProducerAndConsumer(int 
batchMessageDelayMs) throws Exceptio
             producerConf.setBatchingEnabled(true);
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
 
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", 
producerConf);
@@ -157,6 +159,8 @@ public void testAsyncProducerAndAsyncAck(int 
batchMessageDelayMs) throws Excepti
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
             producerConf.setBatchingEnabled(true);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", 
producerConf);
         List<Future<MessageId>> futures = Lists.newArrayList();
@@ -214,6 +218,8 @@ public void testMessageListener(int batchMessageDelayMs) 
throws Exception {
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
             producerConf.setBatchingEnabled(true);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic3", 
producerConf);
         List<Future<MessageId>> futures = Lists.newArrayList();
@@ -249,6 +255,8 @@ public void testBackoffAndReconnect(int 
batchMessageDelayMs) throws Exception {
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
             producerConf.setBatchingEnabled(true);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic4", 
producerConf);
 
@@ -299,6 +307,8 @@ public void testSendTimeout(int batchMessageDelayMs) throws 
Exception {
             producerConf.setBatchingMaxPublishDelay(2 * batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
             producerConf.setBatchingEnabled(true);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
         producerConf.setSendTimeout(1, TimeUnit.SECONDS);
 
@@ -520,6 +530,8 @@ public Void call() throws Exception {
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
             producerConf.setBatchingEnabled(true);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic7", 
producerConf);
         for (int i = 0; i < recvQueueSize; i++) {
@@ -627,6 +639,7 @@ public void testSendBigMessageSizeButCompressed() throws 
Exception {
 
         // (a) non-batch msg with compression
         ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
         producerConf.setCompressionType(CompressionType.LZ4);
         Producer producer = pulsarClient.createProducer(topic, producerConf);
         Message message = MessageBuilder.create().setContent(new 
byte[PulsarDecoder.MaxMessageSize + 1]).build();
@@ -649,6 +662,7 @@ public void testSendBigMessageSizeButCompressed() throws 
Exception {
 
         // (c) non-batch msg without compression
         producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
         producerConf.setCompressionType(CompressionType.NONE);
         producer = pulsarClient.createProducer(topic, producerConf);
         message = MessageBuilder.create().setContent(new 
byte[PulsarDecoder.MaxMessageSize + 1]).build();
@@ -662,6 +676,7 @@ public void testSendBigMessageSizeButCompressed() throws 
Exception {
 
         // (d) non-batch msg with compression and try to consume message
         producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
         producerConf.setCompressionType(CompressionType.LZ4);
         producer = pulsarClient.createProducer(topic, producerConf);
         Consumer consumer = pulsarClient.subscribe(topic, "sub1");
@@ -704,6 +719,8 @@ public void 
testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
             producerConf.setBatchingEnabled(true);
             producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
             producerConf.setBatchingMaxMessages(5);
+        } else {
+            producerConf.setBatchingEnabled(false);
         }
 
         /************ usecase-1: *************/
@@ -959,6 +976,7 @@ public void testSendCallBack() throws Exception {
 
         final int totalMsg = 100;
         final ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", 
producerConf);
         for (int i = 0; i < totalMsg; i++) {
             final String message = "my-message-" + i;
@@ -1358,6 +1376,7 @@ public void 
testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio
                     
.subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", 
conf);
 
             ProducerConfiguration producerConf = new ProducerConfiguration();
+            producerConf.setBatchingEnabled(false);
 
             Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
                     producerConf);
@@ -1494,6 +1513,8 @@ public void testUnackedBlockAtBatch(int 
batchMessageDelayMs) throws Exception {
                 producerConf.setBatchingEnabled(true);
                 producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
                 producerConf.setBatchingMaxMessages(5);
+            } else {
+                producerConf.setBatchingEnabled(false);
             }
 
             Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
@@ -1961,6 +1982,7 @@ public void testSharedSamePriorityConsumer() throws 
Exception {
         Consumer c2 = 
pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", 
"my-subscriber-name",
                 conf1);
         ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", 
producerConf);
         List<Future<MessageId>> futures = Lists.newArrayList();
 
@@ -2366,6 +2388,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> keyMe
         final int totalMsg = 10;
 
         ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(false);
 
         Message msg = null;
         Set<String> messageSet = Sets.newHashSet();
@@ -2389,6 +2412,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> keyMe
         producerConf = new ProducerConfiguration();
         producerConf.setCryptoKeyReader(new EncKeyReader());
         producerConf.addEncryptionKey("client-rsa.pem");
+        producerConf.setBatchingEnabled(false);
         Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/myenc-topic1",
                 producerConf);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index 6d6fc924c..48775e061 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -46,6 +46,7 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
@@ -89,7 +90,10 @@ public void producerSendAsync() throws PulsarClientException 
{
         final int numberOfMessages = 30;
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -181,7 +185,10 @@ public void partitionedProducerSendAsync() throws 
PulsarClientException, PulsarA
         admin.persistentTopics().createPartitionedTopic(topicName, 
numberOfPartitions);
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -230,7 +237,10 @@ public void partitionedProducerSend() throws 
PulsarClientException, PulsarAdminE
         admin.persistentTopics().createPartitionedTopic(topicName, 
numberOfPartitions);
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -279,7 +289,10 @@ public void testChecksumVersionComptability() throws 
Exception {
         final String topicName = "persistent://prop/use/ns-abc/topic1";
 
         // 1. producer connect
-        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName).create();
+        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         ProducerImpl<byte[]> producer = spy(prod);
         // return higher version compare to broker : so, it forces 
client-producer to remove checksum from payload
         doReturn(producer.brokerChecksumSupportedVersion() + 
1).when(producer).brokerChecksumSupportedVersion();
@@ -344,7 +357,10 @@ public void testChecksumReconnection() throws Exception {
         final String topicName = "persistent://prop/use/ns-abc/topic1";
 
         // 1. producer connect
-        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName).create();
+        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
         ProducerImpl<byte[]> producer = spy(prod);
         // mock: broker-doesn't support checksum (remote_version < 
brokerChecksumSupportedVersion) so, it forces
         // client-producer to perform checksum-strip from msg at reconnection
@@ -420,8 +436,12 @@ public void testCorruptMessageRemove() throws Exception {
         final String topicName = "persistent://prop/use/ns-abc/retry-topic";
 
         // 1. producer connect
-        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
-                .sendTimeout(10, TimeUnit.MINUTES).create();
+        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .sendTimeout(10, TimeUnit.MINUTES)
+            .create();
         ProducerImpl<byte[]> producer = spy(prod);
         Field producerIdField = 
ProducerImpl.class.getDeclaredField("producerId");
         producerIdField.setAccessible(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 9624e8e54..92c2ccc4a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -32,6 +32,7 @@
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -144,11 +145,15 @@ public void testBinaryProtoToGetTopicsOfNamespace() 
throws Exception {
         int totalMessages = 30;
 
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -301,11 +306,15 @@ public void testStartEmptyPatternConsumer() throws 
Exception {
         int totalMessages = 30;
 
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -368,11 +377,15 @@ public void testAutoSubscribePatternConsumer() throws 
Exception {
         int totalMessages = 30;
 
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -415,6 +428,7 @@ public void testAutoSubscribePatternConsumer() throws 
Exception {
         String topicName4 = "persistent://my-property/my-ns/pattern-topic-4-" 
+ key;
         admin.persistentTopics().createPartitionedTopic(topicName4, 4);
         Producer<byte[]> producer4 = 
pulsarClient.newProducer().topic(topicName4)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -471,11 +485,15 @@ public void testAutoUnbubscribePatternConsumer() throws 
Exception {
         int totalMessages = 30;
 
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 053cb5ea1..8e4c7fa2d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -61,7 +61,10 @@ public void testSharedAckedNormalTopic() throws Exception {
         final int totalMessages = 15;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -156,7 +159,10 @@ public void testExclusiveAckedNormalTopic() throws 
Exception {
         final int totalMessages = 15;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -251,7 +257,10 @@ public void testFailoverAckedNormalTopic() throws 
Exception {
         final int totalMessages = 15;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -356,7 +365,8 @@ public void testSharedAckedPartitionedTopic() throws 
Exception {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index 5e7383e22..75e666ad7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -35,6 +35,7 @@
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
@@ -77,7 +78,12 @@ public void cleanup() throws Exception {
     private Set<String> publishMessages(String topic, int count) throws 
Exception {
         Set<String> keys = new HashSet<>();
 
-        try (Producer<byte[]> producer = 
pulsarClient.newProducer().maxPendingMessages(count).topic(topic).create()) {
+        try (Producer<byte[]> producer = pulsarClient.newProducer()
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .maxPendingMessages(count)
+            .topic(topic)
+            .create()) {
             Future<?> lastFuture = null;
             for (int i = 0; i < count; i++) {
                 String key = "key"+i;
@@ -232,8 +238,13 @@ public void testFlowControl() throws Exception {
     public void testBatchingExtractKeysAndIds() throws Exception {
         String topic = "persistent://my-property/my-ns/my-raw-topic";
 
-        try (Producer producer = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
-                
.enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, 
TimeUnit.HOURS).create()) {
+        try (Producer producer = pulsarClient.newProducer().topic(topic)
+            .maxPendingMessages(3)
+            .enableBatching(true)
+            .batchingMaxMessages(3)
+            .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create()) {
             producer.sendAsync(MessageBuilder.create()
                                
.setKey("key1").setContent("my-content-1".getBytes()).build());
             producer.sendAsync(MessageBuilder.create()
@@ -265,8 +276,13 @@ public void testBatchingExtractKeysAndIds() throws 
Exception {
     public void testBatchingRebatch() throws Exception {
         String topic = "persistent://my-property/my-ns/my-raw-topic";
 
-        try (Producer producer = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
-                
.enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, 
TimeUnit.HOURS).create()) {
+        try (Producer producer = pulsarClient.newProducer().topic(topic)
+            .maxPendingMessages(3)
+            .enableBatching(true)
+            .batchingMaxMessages(3)
+            .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create()) {
             producer.sendAsync(MessageBuilder.create()
                                
.setKey("key1").setContent("my-content-1".getBytes()).build());
             producer.sendAsync(MessageBuilder.create()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 6aa574ddf..83e70e5f8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -35,6 +35,7 @@
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -151,11 +152,15 @@ public void testSyncProducerAndConsumer() throws 
Exception {
 
         // 1. producer connect
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -212,11 +217,15 @@ public void testAsyncConsumer() throws Exception {
 
         // 1. producer connect
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -291,11 +300,15 @@ public void testConsumerUnackedRedelivery() throws 
Exception {
 
         // 1. producer connect
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
@@ -431,11 +444,15 @@ public void testSubscribeUnsubscribeSingleTopic() throws 
Exception {
 
         // 1. producer connect
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
         Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicName3)
+            .enableBatching(false)
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index e6adf5c90..42c957f09 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -64,7 +64,10 @@ public void testExclusiveSingleAckedNormalTopic() throws 
Exception {
         final int totalMessages = 10;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -119,7 +122,10 @@ public void testExclusiveCumulativeAckedNormalTopic() 
throws Exception {
         final int totalMessages = 10;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -166,7 +172,9 @@ public void testSharedSingleAckedPartitionedTopic() throws 
Exception {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -252,7 +260,9 @@ public void testFailoverSingleAckedPartitionedTopic() 
throws Exception {
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
         Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -341,7 +351,10 @@ public void testCheckUnAcknowledgedMessageTimer() throws 
PulsarClientException,
         final int totalMessages = 3;
 
         // 1. producer connect
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 2. Create consumer
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
index 6cdc5069b..2f23554c5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
@@ -26,6 +26,7 @@
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -94,7 +95,10 @@ public void zeroQueueSizeNormalConsumer() throws 
PulsarClientException {
         final String messagePredicate = "my-message-" + key + "-";
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
@@ -128,7 +132,10 @@ public void zeroQueueSizeSharedSubscription() throws 
PulsarClientException {
         final String messagePredicate = "my-message-" + key + "-";
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         int numOfSubscribers = 4;
@@ -166,7 +173,10 @@ public void zeroQueueSizeFailoverSubscription() throws 
PulsarClientException {
         final String messagePredicate = "my-message-" + key + "-";
 
         // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // 3. Create Consumer
         ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
@@ -217,11 +227,14 @@ public void testFailedZeroQueueSizeBatchMessage() throws 
PulsarClientException {
                 .subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://prop-xyz/use/ns-abc/topic1");
+            .topic("persistent://prop-xyz/use/ns-abc/topic1")
+            .messageRoutingMode(MessageRoutingMode.SinglePartition);
 
         if (batchMessageDelayMs != 0) {
             
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs,
 TimeUnit.MILLISECONDS)
                     .batchingMaxMessages(5);
+        } else {
+            producerBuilder.enableBatching(false);
         }
 
         Producer<byte[]> producer = producerBuilder.create();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 8f64ef544..a0382bad1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -42,6 +42,7 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -94,7 +95,11 @@ public void testCompaction() throws Exception {
         final int numMessages = 20;
         final int maxKeys = 10;
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         Map<String, byte[]> expected = new HashMap<>();
         List<Pair<String, byte[]>> all = new ArrayList<>();
@@ -147,7 +152,10 @@ public void testCompaction() throws Exception {
     public void testReadCompactedBeforeCompaction() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .create();
 
         
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
 
@@ -185,7 +193,10 @@ public void testReadCompactedBeforeCompaction() throws 
Exception {
     public void testReadEntriesAfterCompaction() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .create();
 
         
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
 
@@ -214,7 +225,10 @@ public void testReadEntriesAfterCompaction() throws 
Exception {
     public void testSeekEarliestAfterCompaction() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .create();
 
         
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
         
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
@@ -253,7 +267,10 @@ public void testSeekEarliestAfterCompaction() throws 
Exception {
     public void testBrokerRestartAfterCompaction() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .create();
 
         
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
 
@@ -293,7 +310,10 @@ public void testBrokerRestartAfterCompaction() throws 
Exception {
     public void testCompactEmptyTopic() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .create();
 
         
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
 
@@ -317,7 +337,7 @@ public void testFirstMessageRetained() throws Exception {
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
             .readCompacted(true).subscribe().close();
 
-        try (Producer producer = pulsarClient.createProducer(topic)) {
+        try (Producer producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false).create()) {
             producer.sendAsync(MessageBuilder.create()
                                .setKey("key1")
                                .setContent("my-message-1".getBytes()).build());
@@ -365,9 +385,14 @@ public void testBatchMessageIdsDontChange() throws 
Exception {
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
             .readCompacted(true).subscribe().close();
 
-        try (Producer producer = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
-                .enableBatching(true).batchingMaxMessages(3)
-                .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+        try (Producer producer = pulsarClient.newProducer().topic(topic)
+            .maxPendingMessages(3)
+            .enableBatching(true)
+            .batchingMaxMessages(3)
+            .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create()
+        ) {
             producer.sendAsync(MessageBuilder.create()
                                .setKey("key1")
                                .setContent("my-message-1".getBytes()).build());
@@ -425,10 +450,17 @@ public void testWholeBatchCompactedOut() throws Exception 
{
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
             .readCompacted(true).subscribe().close();
 
-        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create();
-             Producer producerBatch = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
-                .enableBatching(true).batchingMaxMessages(3)
-                .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+        try (Producer producerNormal = pulsarClient.newProducer().topic(topic)
+                 .enableBatching(false)
+                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                 .create();
+             Producer producerBatch = pulsarClient.newProducer().topic(topic)
+                 .maxPendingMessages(3)
+                 .enableBatching(true)
+                 .batchingMaxMessages(3)
+                 .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                 .create()) {
             producerBatch.sendAsync(MessageBuilder.create()
                                     .setKey("key1")
                                     
.setContent("my-message-1".getBytes()).build());
@@ -533,10 +565,17 @@ public void testEmptyPayloadDeletes() throws Exception {
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
             .readCompacted(true).subscribe().close();
 
-        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create();
-             Producer producerBatch = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
-                .enableBatching(true).batchingMaxMessages(3)
-                .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+        try (Producer producerNormal = pulsarClient.newProducer()
+                 .topic(topic)
+                 .enableBatching(false)
+                 .create();
+             Producer producerBatch = pulsarClient.newProducer()
+                 .topic(topic)
+                 .maxPendingMessages(3)
+                 .enableBatching(true)
+                 .batchingMaxMessages(3)
+                 .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                 .create()) {
 
             // key0 persists through it all
             producerNormal.sendAsync(MessageBuilder.create()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 84bc3959b..f41c62c29 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -35,6 +35,7 @@
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.RawMessageImpl;
@@ -117,7 +118,10 @@ public void testCompaction() throws Exception {
         final int numMessages = 1000;
         final int maxKeys = 10;
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         Map<String, byte[]> expected = new HashMap<>();
         Random r = new Random(0);
@@ -138,7 +142,10 @@ public void testCompaction() throws Exception {
     public void testCompactAddCompact() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         Map<String, byte[]> expected = new HashMap<>();
 
@@ -168,7 +175,10 @@ public void testCompactAddCompact() throws Exception {
     public void testCompactedInOrder() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         producer.send(MessageBuilder.create()
                       .setKey("c")
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 03d82a785..a9723a00b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -115,7 +115,7 @@
      * Set the max size of the queue holding the messages pending to receive 
an acknowledgment from the broker.
      * <p>
      * When the queue is full, by default, all calls to {@link Producer#send} 
and {@link Producer#sendAsync} will fail
-     * unless blockIfQueueFull is set to true. Use {@link 
#setBlockIfQueueFull} to change the blocking behavior.
+     * unless blockIfQueueFull is set to true. Use {@link 
#blockIfQueueFull(boolean)} to change the blocking behavior.
      *
      * @param maxPendingMessages
      * @return
@@ -148,13 +148,15 @@
     /**
      * Set the message routing mode for the partitioned producer.
      *
-     * Default routing mode for messages to partition.
+     * Default routing mode is round-robin routing.
      *
      * This logic is applied when the application is not setting a key {@link 
MessageBuilder#setKey(String)} on a
      * particular message.
      *
      * @param messageRoutingMode
      *            the message routing mode
+     * @return producer builder
+     * @see MessageRoutingMode
      */
     ProducerBuilder<T> messageRoutingMode(MessageRoutingMode 
messageRoutingMode);
 
@@ -205,9 +207,13 @@
      * messages will be compressed at the batch level, leading to a much 
better compression ratio for similar headers or
      * contents.
      *
-     * When enabled default batch delay is set to 10 ms and default batch size 
is 1000 messages
+     * When enabled default batch delay is set to 1 ms and default batch size 
is 1000 messages
      *
+     * <p>Batching is enabled by default since 2.0.0.
+     *
+     * @return producer builder.
      * @see #batchingMaxPublishDelay(long, TimeUnit)
+     * @see #batchingMaxMessages(int)
      */
     ProducerBuilder<T> enableBatching(boolean enableBatching);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 49cd1d8f4..1af077be9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -184,10 +184,11 @@ public ProducerConfiguration setBlockIfQueueFull(boolean 
blockIfQueueFull) {
     }
 
     /**
-     * Set the message routing mode for the partitioned producer
+     * Set the message routing mode for the partitioned producer.
      *
-     * @param mode
-     * @return
+     * @param messageRouteMode message routing mode.
+     * @return producer configuration
+     * @see MessageRoutingMode
      */
     public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode 
messageRouteMode) {
         checkNotNull(messageRouteMode);
@@ -197,9 +198,10 @@ public ProducerConfiguration 
setMessageRoutingMode(MessageRoutingMode messageRou
     }
 
     /**
-     * Get the message routing mode for the partitioned producer
+     * Get the message routing mode for the partitioned producer.
      *
-     * @return
+     * @return message routing mode, default is round-robin routing.
+     * @see MessageRoutingMode#RoundRobinPartition
      */
     public MessageRoutingMode getMessageRoutingMode() {
         return 
MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString());
@@ -269,9 +271,12 @@ public MessageRouter getMessageRouter() {
     }
 
     /**
-     * @ return if batch messages are enabled
+     * Return the flag whether automatic message batching is enabled or not.
+     *
+     * @return true if batch messages are enabled. otherwise false.
+     * @since 2.0.0 <br>
+     *        It is enabled by default.
      */
-
     public boolean getBatchingEnabled() {
         return conf.isBatchingEnabled();
     }
@@ -290,8 +295,8 @@ public boolean getBatchingEnabled() {
      * @since 1.0.36 <br>
      *        Make sure all the consumer applications have been updated to use 
this client version, before starting to
      *        batch messages.
+     *
      */
-
     public ProducerConfiguration setBatchingEnabled(boolean 
batchMessagesEnabled) {
         conf.setBatchingEnabled(batchMessagesEnabled);
         return this;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 1449a454f..9f068091b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -50,7 +50,7 @@
     private boolean blockIfQueueFull = false;
     private int maxPendingMessages = 1000;
     private int maxPendingMessagesAcrossPartitions = 50000;
-    private MessageRoutingMode messageRoutingMode = 
MessageRoutingMode.SinglePartition;
+    private MessageRoutingMode messageRoutingMode = 
MessageRoutingMode.RoundRobinPartition;
     private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
 
     private ProducerCryptoFailureAction cryptoFailureAction = 
ProducerCryptoFailureAction.FAIL;
@@ -58,9 +58,9 @@
     @JsonIgnore
     private MessageRouter customMessageRouter = null;
 
-    private long batchingMaxPublishDelayMicros = 
TimeUnit.MILLISECONDS.toMicros(10);
+    private long batchingMaxPublishDelayMicros = 
TimeUnit.MILLISECONDS.toMicros(1);
     private int batchingMaxMessages = 1000;
-    private boolean batchingEnabled = false; // disabled by default
+    private boolean batchingEnabled = true; // enabled by default
 
     @JsonIgnore
     private CryptoKeyReader cryptoKeyReader;
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 68b789a57..c6f10613e 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -85,8 +85,11 @@ public void testProducer() throws Exception {
     public void testProducerConsumer() throws Exception {
         PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
proxyConfig.getServicePort())
                 .build();
-        Producer<byte[]> producer = 
client.newProducer().topic("persistent://sample/test/local/producer-consumer-topic")
-                .create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic("persistent://sample/test/local/producer-consumer-topic")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
 
         // Create a consumer directly attached to broker
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
@@ -116,8 +119,10 @@ public void testPartitions() throws Exception {
                 .build();
         
admin.persistentTopics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic",
 2);
 
-        Producer<byte[]> producer = 
client.newProducer().topic("persistent://sample/test/local/partitioned-topic")
-                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+        Producer<byte[]> producer = client.newProducer()
+            .topic("persistent://sample/test/local/partitioned-topic")
+            .enableBatching(false)
+            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         // Create a consumer directly attached to broker
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic")
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index c2c3ee940..5ed8adfcb 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -251,6 +251,9 @@ private void updateSentMsgStats(long msgSize, long 
latencyUsec) {
     private ProducerConfiguration getProducerConfiguration() {
         ProducerConfiguration conf = new ProducerConfiguration();
 
+        conf.setBatchingEnabled(false);
+        conf.setMessageRoutingMode(MessageRoutingMode.SinglePartition);
+
         // Set to false to prevent the server thread from being blocked if a 
lot of messages are pending.
         conf.setBlockIfQueueFull(false);
 
diff --git 
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
 
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
index 6a6130f29..108f71d80 100644
--- 
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
+++ 
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
@@ -25,6 +25,7 @@
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.tests.DockerUtils;
@@ -72,7 +73,11 @@ public void testPublishCompactAndConsumeCLI() throws 
Exception {
         try (PulsarClient client = PulsarClient.create(serviceUrl)) {
             
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
 
-            try(Producer<byte[]> producer = 
client.newProducer().topic(topic).create()) {
+            try(Producer<byte[]> producer = client.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create()) {
                 
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
                 
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
             }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to