This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 708012096eb [improve][client][branch-2.11]PIP-189: No batching if only 
one message in batch (#18548)
708012096eb is described below

commit 708012096eb781c645697a6f2b74bfc2574ff66e
Author: houxiaoyu <houxia...@apache.org>
AuthorDate: Mon Nov 21 16:25:49 2022 +0800

    [improve][client][branch-2.11]PIP-189: No batching if only one message in 
batch (#18548)
---
 .../RGUsageMTAggrWaitForAllMsgsTest.java           |   4 +-
 .../pulsar/broker/service/BatchMessageTest.java    |  37 +++++++-
 .../broker/service/BrokerEntryMetadataE2ETest.java |  67 ++++++++-----
 .../pulsar/client/api/ClientDeduplicationTest.java |   7 +-
 .../apache/pulsar/client/api/TopicReaderTest.java  |   6 +-
 .../client/cli/PulsarClientToolForceBatchNum.java  | 104 +++++++++++++++++++++
 .../pulsar/client/cli/PulsarClientToolTest.java    |   9 +-
 .../client/impl/BatchMessageContainerImpl.java     |  38 +++++++-
 .../apache/pulsar/client/impl/ProducerImpl.java    |   3 +
 9 files changed, 240 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 27f9e905262..1acc5ad0039 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -769,8 +769,8 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
         Assert.assertNotEquals(ninthPercentileValue, 0);
     }
 
-    // Empirically, there appears to be a 42-byte overhead for metadata, 
imposed by Pulsar runtime.
-    private static final int PER_MESSAGE_METADATA_OHEAD = 42;
+    // Empirically, there appears to be a 31-byte overhead for metadata, 
imposed by Pulsar runtime.
+    private static final int PER_MESSAGE_METADATA_OHEAD = 31;
 
     private static final int PUBLISH_INTERVAL_SECS = 10;
     private static final int NUM_PRODUCERS = 4;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 0d18e243884..e9c5032063f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
@@ -865,6 +866,37 @@ public class BatchMessageTest extends BrokerTestBase {
         producer.close();
     }
 
+    @Test(dataProvider = "containerBuilder")
+    public void testBatchSendOneMessage(BatcherBuilder builder) throws 
Exception {
+        final String topicName = 
"persistent://prop/ns-abc/testBatchSendOneMessage-" + UUID.randomUUID();
+        final String subscriptionName = "sub-1";
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .batchingMaxPublishDelay(1, 
TimeUnit.SECONDS).batchingMaxMessages(10).enableBatching(true)
+            .batcherBuilder(builder)
+            .create();
+        String msg = "my-message";
+        MessageId messageId = 
producer.newMessage().value(msg.getBytes()).property("key1", "value1").send();
+
+        Assert.assertTrue(messageId instanceof MessageIdImpl);
+        Assert.assertFalse(messageId instanceof BatchMessageIdImpl);
+
+        Message<byte[]> received = consumer.receive();
+        assertEquals(received.getSequenceId(), 0);
+        consumer.acknowledge(received);
+
+        Assert.assertEquals(new String(received.getData()), msg);
+        Assert.assertFalse(received.getProperties().isEmpty());
+        Assert.assertEquals(received.getProperties().get("key1"), "value1");
+        Assert.assertFalse(received.getMessageId() instanceof 
BatchMessageIdImpl);
+
+        producer.close();
+        consumer.close();
+    }
+
     @Test(dataProvider = "containerBuilder")
     public void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws 
Exception {
 
@@ -1034,7 +1066,10 @@ public class BatchMessageTest extends BrokerTestBase {
             if (enableBatch) {
                 // only ack messages which batch index < 2, which means we 
will not to ack the
                 // whole batch for the batch that with more than 2 messages
-                if (((BatchMessageIdImpl) 
message.getMessageId()).getBatchIndex() < 2) {
+                if ((message.getMessageId() instanceof BatchMessageIdImpl)
+                    && ((BatchMessageIdImpl) 
message.getMessageId()).getBatchIndex() < 2) {
+                    consumer.acknowledgeAsync(message).get();
+                } else if (!(message.getMessageId() instanceof 
BatchMessageIdImpl)){
                     consumer.acknowledgeAsync(message).get();
                 }
             } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 49b4742b71d..33785ba8795 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.broker.service;
 import static 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -36,6 +38,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.assertj.core.util.Sets;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -211,57 +214,75 @@ public class BrokerEntryMetadataE2ETest extends 
BrokerTestBase {
         final String topic = newTopicName();
         final String subscription = "my-sub";
         final long eventTime= 200;
+        final int msgNum = 2;
 
         @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
+                 // make sure 2 messages in one batch, because if only one 
message in batch,
+                 // producer will not send batched messages
+                .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+                .batchingMaxMessages(msgNum)
+                .batchingMaxBytes(Integer.MAX_VALUE)
                 .enableBatching(true)
                 .create();
 
         long sendTime = System.currentTimeMillis();
-        // send message which is batch message and only contains one message, 
so do not set the deliverAtTime
-        MessageIdImpl messageId  = (MessageIdImpl) producer.newMessage()
+        // send message which is batch message, so do not set the deliverAtTime
+        List<CompletableFuture<MessageId>> messageIdsFuture = new 
ArrayList<>(msgNum);
+        for (int i = 0; i < msgNum; ++i) {
+            CompletableFuture<MessageId> messageId = producer.newMessage()
                 .eventTime(eventTime)
-                .value(("hello").getBytes())
-                .send();
+                .value(("hello" + i).getBytes())
+                .sendAsync();
+            messageIdsFuture.add(messageId);
+        }
+        FutureUtil.waitForAll(messageIdsFuture);
 
         // 1. test for peekMessages
         admin.topics().createSubscription(topic, subscription, 
MessageId.earliest);
-        final List<Message<byte[]>> messages = 
admin.topics().peekMessages(topic, subscription, 1);
-        Assert.assertEquals(messages.size(), 1);
-
-        MessageImpl message = (MessageImpl) messages.get(0);
-        Assert.assertEquals(message.getData(), ("hello").getBytes());
-        Assert.assertTrue(message.getPublishTime() >= sendTime);
-        BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
-        Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
-        Assert.assertEquals(entryMetadata.getIndex(), 0);
-        System.out.println(message.getProperties());
-        
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
-        // make sure BATCH_SIZE_HEADER > 0
-        
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
+        final List<Message<byte[]>> messages = 
admin.topics().peekMessages(topic, subscription, msgNum);
+        Assert.assertEquals(messages.size(), msgNum);
+
+        MessageImpl message;
+        BrokerEntryMetadata entryMetadata;
+        for (int i = 0; i < msgNum; ++i) {
+            message = (MessageImpl) messages.get(i);
+            Assert.assertEquals(message.getData(), ("hello" + i).getBytes());
+            Assert.assertTrue(message.getPublishTime() >= sendTime);
+            entryMetadata = message.getBrokerEntryMetadata();
+            Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
+            Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
+            System.out.println(message.getProperties());
+            
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 
msgNum);
+            // make sure BATCH_SIZE_HEADER > 0
+            
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
+        }
 
+        // getMessagesById and examineMessage only return the first messages 
in the batch
         // 2. test for getMessagesById
+        MessageIdImpl messageId = (MessageIdImpl) 
messageIdsFuture.get(0).get();
         message = (MessageImpl) admin.topics().getMessageById(topic, 
messageId.getLedgerId(), messageId.getEntryId());
-        Assert.assertEquals(message.getData(), ("hello").getBytes());
+        // getMessagesById return the first message in the batch
+        Assert.assertEquals(message.getData(), ("hello" + 0).getBytes());
         Assert.assertTrue(message.getPublishTime() >= sendTime);
         entryMetadata = message.getBrokerEntryMetadata();
         Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
-        Assert.assertEquals(entryMetadata.getIndex(), 0);
+        Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
         System.out.println(message.getProperties());
-        
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
+        
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 
msgNum);
         // make sure BATCH_SIZE_HEADER > 0
         
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
 
         // 3. test for examineMessage
         message = (MessageImpl) admin.topics().examineMessage(topic, 
"earliest", 1);
-        Assert.assertEquals(message.getData(), ("hello").getBytes());
+        Assert.assertEquals(message.getData(), ("hello" + 0).getBytes());
         Assert.assertTrue(message.getPublishTime() >= sendTime);
         entryMetadata = message.getBrokerEntryMetadata();
         Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
-        Assert.assertEquals(entryMetadata.getIndex(), 0);
+        Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
         System.out.println(message.getProperties());
-        
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
+        
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 
msgNum);
         // make sure BATCH_SIZE_HEADER > 0
         
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index 52017444a2b..c8acc7d46f8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
@@ -361,8 +362,10 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
         for (int i = 0; i < 5; i++) {
             // Currently sending a duplicated message won't throw an 
exception. Instead, an invalid result is returned.
             final MessageId messageId = 
producer.newMessage().value("msg").sequenceId(i).send();
-            assertTrue(messageId instanceof BatchMessageIdImpl);
-            final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) 
messageId;
+            // a duplicated message will send in a single batch, that will 
perform as a non-batched sending
+            assertTrue(messageId instanceof MessageIdImpl);
+            assertFalse(messageId instanceof BatchMessageIdImpl);
+            final MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
             assertEquals(messageIdImpl.getLedgerId(), -1L);
             assertEquals(messageIdImpl.getEntryId(), -1L);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 8b533b5b450..72252309e65 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -1080,7 +1080,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
     }
 
     @Test(timeOut = 20000)
-    public void testHasMessageAvailableWithBatch() throws Exception {
+    public void testHasMessageAvailable() throws Exception {
         final String topicName = 
"persistent://my-property/my-ns/testHasMessageAvailableWithBatch";
         final int numOfMessage = 10;
 
@@ -1092,11 +1092,11 @@ public class TopicReaderTest extends 
ProducerConsumerBase {
 
         //For batch-messages with single message, the type of client messageId 
should be the same as that of broker
         MessageIdImpl messageId = (MessageIdImpl) 
producer.send("msg".getBytes());
-        assertTrue(messageId instanceof MessageIdImpl);
+        assertFalse(messageId instanceof BatchMessageIdImpl);
         ReaderImpl<byte[]> reader = 
(ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
                 .startMessageId(messageId).startMessageIdInclusive().create();
         MessageIdImpl lastMsgId = (MessageIdImpl) 
reader.getConsumer().getLastMessageId();
-        assertTrue(messageId instanceof BatchMessageIdImpl);
+        assertFalse(lastMsgId instanceof BatchMessageIdImpl);
         assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
         assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
         reader.close();
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
new file mode 100644
index 00000000000..b9f7b3f5e6f
--- /dev/null
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.cli;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+
+/**
+ * An implement of {@link PulsarClientTool} for test, which will publish 
messages iff there is enough messages
+ * in the batch.
+ */
+public class PulsarClientToolForceBatchNum extends PulsarClientTool{
+    private final String topic;
+    private final int batchNum;
+
+    /**
+     *
+     * @param properties properties
+     * @param topic topic
+     * @param batchNum iff there is batchNum messages in the batch, the 
producer will flush and send.
+     */
+    public PulsarClientToolForceBatchNum(Properties properties, String topic, 
int batchNum) {
+        super(properties);
+        this.topic = topic;
+        this.batchNum = batchNum;
+    }
+
+    @Override
+    protected void initJCommander() {
+        super.initJCommander();
+        produceCommand = new CmdProduce() {
+            @Override
+            public void updateConfig(ClientBuilder newBuilder, Authentication 
authentication, String serviceURL) {
+                try {
+                    super.updateConfig(mockClientBuilder(newBuilder), 
authentication, serviceURL);
+                } catch (Exception e) {
+                    Assert.fail("update config fail " + e.getMessage());
+                }
+            }
+        };
+        jcommander.addCommand("produce", produceCommand);
+    }
+
+    private ClientBuilder mockClientBuilder(ClientBuilder newBuilder) throws 
Exception {
+        PulsarClientImpl client = (PulsarClientImpl) newBuilder.build();
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer()
+            .batchingMaxBytes(Integer.MAX_VALUE)
+            .batchingMaxMessages(batchNum)
+            .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+            .topic(topic);
+        Producer<byte[]> producer = producerBuilder.create();
+
+        PulsarClientImpl mockClient = spy(client);
+        ProducerBuilder<byte[]> mockProducerBuilder = spy(producerBuilder);
+        Producer<byte[]> mockProducer = spy(producer);
+        ClientBuilder mockClientBuilder = spy(newBuilder);
+
+        doAnswer((Answer<TypedMessageBuilder>) invocation -> {
+            TypedMessageBuilder typedMessageBuilder = 
spy((TypedMessageBuilder) invocation.callRealMethod());
+            doAnswer((Answer<MessageId>) invocation1 -> {
+                TypedMessageBuilder mock = ((TypedMessageBuilder) 
invocation1.getMock());
+                // using sendAsync() to replace send()
+                mock.sendAsync();
+                return null;
+            }).when(typedMessageBuilder).send();
+            return typedMessageBuilder;
+        }).when(mockProducer).newMessage();
+
+        doReturn(mockProducer).when(mockProducerBuilder).create();
+        
doReturn(mockProducerBuilder).when(mockClient).newProducer(any(Schema.class));
+        doReturn(mockClient).when(mockClientBuilder).build();
+        return mockClientBuilder;
+    }
+}
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 2fae6330843..b1aa0835756 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -244,16 +244,19 @@ public class PulsarClientToolTest extends BrokerTestBase {
         properties.setProperty("useTls", "false");
 
         final String topicName = getTopicWithRandomSuffix("disable-batching");
-        final int numberOfMessages = 5;
+        // `numberOfMessages` should be an even number, because we set 
`batchNum` as 2, make sure batch and non batch
+        // messages in the same batch
+        final int numberOfMessages = 6;
+        final int batchNum = 2;
 
         @Cleanup
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
 
-        PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties);
+        PulsarClientTool pulsarClientTool1 = new 
PulsarClientToolForceBatchNum(properties, topicName, batchNum);
         String[] args1 = {"produce", "-m", "batched", "-n", 
Integer.toString(numberOfMessages), topicName};
         Assert.assertEquals(pulsarClientTool1.run(args1), 0);
 
-        PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties);
+        PulsarClientTool pulsarClientTool2 = new 
PulsarClientToolForceBatchNum(properties, topicName, batchNum);
         String[] args2 = {"produce", "-m", "non-batched", "-n", 
Integer.toString(numberOfMessages), "-db", topicName};
         Assert.assertEquals(pulsarClientTool2.run(args2), 0);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 43c229d6e81..2d91aafb7e3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -139,8 +139,12 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
             MessageImpl<?> msg = messages.get(i);
             msg.getDataBuffer().markReaderIndex();
             try {
-                batchedMessageMetadataAndPayload = 
Commands.serializeSingleMessageInBatchWithPayload(
+                if (n == 1) {
+                    
batchedMessageMetadataAndPayload.writeBytes(msg.getDataBuffer());
+                } else  {
+                    batchedMessageMetadataAndPayload = 
Commands.serializeSingleMessageInBatchWithPayload(
                         msg.getMessageBuilder(), msg.getDataBuffer(), 
batchedMessageMetadataAndPayload);
+                }
             } catch (Throwable th) {
                 // serializing batch message can corrupt the index of message 
and batch-message. Reset the index so,
                 // next iteration doesn't send corrupt message to broker.
@@ -213,6 +217,38 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
 
     @Override
     public OpSendMsg createOpSendMsg() throws IOException {
+        if (messages.size() == 1) {
+            messageMetadata.clear();
+            messageMetadata.copyFrom(messages.get(0).getMessageBuilder());
+            ByteBuf encryptedPayload = 
producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
+            ByteBufPair cmd = producer.sendMessage(producer.producerId, 
messageMetadata.getSequenceId(),
+                1, messageMetadata, encryptedPayload);
+            final OpSendMsg op;
+
+            // Shouldn't call create(MessageImpl<?> msg, ByteBufPair cmd, long 
sequenceId, SendCallback callback),
+            // otherwise it will bring message out of order problem.
+            // Because when invoke `ProducerImpl.processOpSendMsg` on flush,
+            // if `op.msg != null && isBatchMessagingEnabled()` checks true, 
it will call `batchMessageAndSend` to flush
+            // messageContainers before publishing this one-batch message.
+            op = OpSendMsg.create(messages, cmd, 
messageMetadata.getSequenceId(), firstCallback);
+
+            // NumMessagesInBatch and BatchSizeByte will not be serialized to 
the binary cmd. It's just useful for the
+            // ProducerStats
+            op.setNumMessagesInBatch(1);
+            op.setBatchSizeByte(encryptedPayload.readableBytes());
+
+            // handle mgs size check as non-batched in 
`ProducerImpl.isMessageSizeExceeded`
+            if (op.getMessageHeaderAndPayloadSize() > 
ClientCnx.getMaxMessageSize()) {
+                producer.semaphoreRelease(1);
+                
producer.client.getMemoryLimitController().releaseMemory(messages.get(0).getUncompressedSize());
+                discard(new PulsarClientException.InvalidMessageException(
+                    "Message size is bigger than " + 
ClientCnx.getMaxMessageSize() + " bytes"));
+                return null;
+            }
+            lowestSequenceId = -1L;
+            return op;
+        }
+
         ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
         if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
             producer.semaphoreRelease(messages.size());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index be411cfa4bb..6f7d7e6a148 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1488,6 +1488,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         void setMessageId(long ledgerId, long entryId, int partitionIndex) {
             if (msg != null) {
                 msg.setMessageId(new MessageIdImpl(ledgerId, entryId, 
partitionIndex));
+            } else if (msgs.size() == 1) {
+                // If there is only one message in batch, the producer will 
publish messages like non-batch
+                msgs.get(0).setMessageId(new MessageIdImpl(ledgerId, entryId, 
partitionIndex));
             } else {
                 for (int batchIndex = 0; batchIndex < msgs.size(); 
batchIndex++) {
                     msgs.get(batchIndex)

Reply via email to