This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push: new 708012096eb [improve][client][branch-2.11]PIP-189: No batching if only one message in batch (#18548) 708012096eb is described below commit 708012096eb781c645697a6f2b74bfc2574ff66e Author: houxiaoyu <houxia...@apache.org> AuthorDate: Mon Nov 21 16:25:49 2022 +0800 [improve][client][branch-2.11]PIP-189: No batching if only one message in batch (#18548) --- .../RGUsageMTAggrWaitForAllMsgsTest.java | 4 +- .../pulsar/broker/service/BatchMessageTest.java | 37 +++++++- .../broker/service/BrokerEntryMetadataE2ETest.java | 67 ++++++++----- .../pulsar/client/api/ClientDeduplicationTest.java | 7 +- .../apache/pulsar/client/api/TopicReaderTest.java | 6 +- .../client/cli/PulsarClientToolForceBatchNum.java | 104 +++++++++++++++++++++ .../pulsar/client/cli/PulsarClientToolTest.java | 9 +- .../client/impl/BatchMessageContainerImpl.java | 38 +++++++- .../apache/pulsar/client/impl/ProducerImpl.java | 3 + 9 files changed, 240 insertions(+), 35 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java index 27f9e905262..1acc5ad0039 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java @@ -769,8 +769,8 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { Assert.assertNotEquals(ninthPercentileValue, 0); } - // Empirically, there appears to be a 42-byte overhead for metadata, imposed by Pulsar runtime. - private static final int PER_MESSAGE_METADATA_OHEAD = 42; + // Empirically, there appears to be a 31-byte overhead for metadata, imposed by Pulsar runtime. + private static final int PER_MESSAGE_METADATA_OHEAD = 31; private static final int PUBLISH_INTERVAL_SECS = 10; private static final int NUM_PRODUCERS = 4; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 0d18e243884..e9c5032063f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; @@ -865,6 +866,37 @@ public class BatchMessageTest extends BrokerTestBase { producer.close(); } + @Test(dataProvider = "containerBuilder") + public void testBatchSendOneMessage(BatcherBuilder builder) throws Exception { + final String topicName = "persistent://prop/ns-abc/testBatchSendOneMessage-" + UUID.randomUUID(); + final String subscriptionName = "sub-1"; + + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared).subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .batchingMaxPublishDelay(1, TimeUnit.SECONDS).batchingMaxMessages(10).enableBatching(true) + .batcherBuilder(builder) + .create(); + String msg = "my-message"; + MessageId messageId = producer.newMessage().value(msg.getBytes()).property("key1", "value1").send(); + + Assert.assertTrue(messageId instanceof MessageIdImpl); + Assert.assertFalse(messageId instanceof BatchMessageIdImpl); + + Message<byte[]> received = consumer.receive(); + assertEquals(received.getSequenceId(), 0); + consumer.acknowledge(received); + + Assert.assertEquals(new String(received.getData()), msg); + Assert.assertFalse(received.getProperties().isEmpty()); + Assert.assertEquals(received.getProperties().get("key1"), "value1"); + Assert.assertFalse(received.getMessageId() instanceof BatchMessageIdImpl); + + producer.close(); + consumer.close(); + } + @Test(dataProvider = "containerBuilder") public void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws Exception { @@ -1034,7 +1066,10 @@ public class BatchMessageTest extends BrokerTestBase { if (enableBatch) { // only ack messages which batch index < 2, which means we will not to ack the // whole batch for the batch that with more than 2 messages - if (((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) { + if ((message.getMessageId() instanceof BatchMessageIdImpl) + && ((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) { + consumer.acknowledgeAsync(message).get(); + } else if (!(message.getMessageId() instanceof BatchMessageIdImpl)){ consumer.acknowledgeAsync(message).get(); } } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java index 49b4742b71d..33785ba8795 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java @@ -21,7 +21,9 @@ package org.apache.pulsar.broker.service; import static org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import java.time.Duration; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -36,6 +38,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; +import org.apache.pulsar.common.util.FutureUtil; import org.assertj.core.util.Sets; import org.awaitility.Awaitility; import org.testng.Assert; @@ -211,57 +214,75 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase { final String topic = newTopicName(); final String subscription = "my-sub"; final long eventTime= 200; + final int msgNum = 2; @Cleanup Producer<byte[]> producer = pulsarClient.newProducer() .topic(topic) + // make sure 2 messages in one batch, because if only one message in batch, + // producer will not send batched messages + .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS) + .batchingMaxMessages(msgNum) + .batchingMaxBytes(Integer.MAX_VALUE) .enableBatching(true) .create(); long sendTime = System.currentTimeMillis(); - // send message which is batch message and only contains one message, so do not set the deliverAtTime - MessageIdImpl messageId = (MessageIdImpl) producer.newMessage() + // send message which is batch message, so do not set the deliverAtTime + List<CompletableFuture<MessageId>> messageIdsFuture = new ArrayList<>(msgNum); + for (int i = 0; i < msgNum; ++i) { + CompletableFuture<MessageId> messageId = producer.newMessage() .eventTime(eventTime) - .value(("hello").getBytes()) - .send(); + .value(("hello" + i).getBytes()) + .sendAsync(); + messageIdsFuture.add(messageId); + } + FutureUtil.waitForAll(messageIdsFuture); // 1. test for peekMessages admin.topics().createSubscription(topic, subscription, MessageId.earliest); - final List<Message<byte[]>> messages = admin.topics().peekMessages(topic, subscription, 1); - Assert.assertEquals(messages.size(), 1); - - MessageImpl message = (MessageImpl) messages.get(0); - Assert.assertEquals(message.getData(), ("hello").getBytes()); - Assert.assertTrue(message.getPublishTime() >= sendTime); - BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata(); - Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime); - Assert.assertEquals(entryMetadata.getIndex(), 0); - System.out.println(message.getProperties()); - Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1); - // make sure BATCH_SIZE_HEADER > 0 - Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0); + final List<Message<byte[]>> messages = admin.topics().peekMessages(topic, subscription, msgNum); + Assert.assertEquals(messages.size(), msgNum); + + MessageImpl message; + BrokerEntryMetadata entryMetadata; + for (int i = 0; i < msgNum; ++i) { + message = (MessageImpl) messages.get(i); + Assert.assertEquals(message.getData(), ("hello" + i).getBytes()); + Assert.assertTrue(message.getPublishTime() >= sendTime); + entryMetadata = message.getBrokerEntryMetadata(); + Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime); + Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1); + System.out.println(message.getProperties()); + Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum); + // make sure BATCH_SIZE_HEADER > 0 + Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0); + } + // getMessagesById and examineMessage only return the first messages in the batch // 2. test for getMessagesById + MessageIdImpl messageId = (MessageIdImpl) messageIdsFuture.get(0).get(); message = (MessageImpl) admin.topics().getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId()); - Assert.assertEquals(message.getData(), ("hello").getBytes()); + // getMessagesById return the first message in the batch + Assert.assertEquals(message.getData(), ("hello" + 0).getBytes()); Assert.assertTrue(message.getPublishTime() >= sendTime); entryMetadata = message.getBrokerEntryMetadata(); Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime); - Assert.assertEquals(entryMetadata.getIndex(), 0); + Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1); System.out.println(message.getProperties()); - Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1); + Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum); // make sure BATCH_SIZE_HEADER > 0 Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0); // 3. test for examineMessage message = (MessageImpl) admin.topics().examineMessage(topic, "earliest", 1); - Assert.assertEquals(message.getData(), ("hello").getBytes()); + Assert.assertEquals(message.getData(), ("hello" + 0).getBytes()); Assert.assertTrue(message.getPublishTime() >= sendTime); entryMetadata = message.getBrokerEntryMetadata(); Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime); - Assert.assertEquals(entryMetadata.getIndex(), 0); + Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1); System.out.println(message.getProperties()); - Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1); + Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum); // make sure BATCH_SIZE_HEADER > 0 Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java index 52017444a2b..c8acc7d46f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; @@ -361,8 +362,10 @@ public class ClientDeduplicationTest extends ProducerConsumerBase { for (int i = 0; i < 5; i++) { // Currently sending a duplicated message won't throw an exception. Instead, an invalid result is returned. final MessageId messageId = producer.newMessage().value("msg").sequenceId(i).send(); - assertTrue(messageId instanceof BatchMessageIdImpl); - final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) messageId; + // a duplicated message will send in a single batch, that will perform as a non-batched sending + assertTrue(messageId instanceof MessageIdImpl); + assertFalse(messageId instanceof BatchMessageIdImpl); + final MessageIdImpl messageIdImpl = (MessageIdImpl) messageId; assertEquals(messageIdImpl.getLedgerId(), -1L); assertEquals(messageIdImpl.getEntryId(), -1L); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 8b533b5b450..72252309e65 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -1080,7 +1080,7 @@ public class TopicReaderTest extends ProducerConsumerBase { } @Test(timeOut = 20000) - public void testHasMessageAvailableWithBatch() throws Exception { + public void testHasMessageAvailable() throws Exception { final String topicName = "persistent://my-property/my-ns/testHasMessageAvailableWithBatch"; final int numOfMessage = 10; @@ -1092,11 +1092,11 @@ public class TopicReaderTest extends ProducerConsumerBase { //For batch-messages with single message, the type of client messageId should be the same as that of broker MessageIdImpl messageId = (MessageIdImpl) producer.send("msg".getBytes()); - assertTrue(messageId instanceof MessageIdImpl); + assertFalse(messageId instanceof BatchMessageIdImpl); ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName) .startMessageId(messageId).startMessageIdInclusive().create(); MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId(); - assertTrue(messageId instanceof BatchMessageIdImpl); + assertFalse(lastMsgId instanceof BatchMessageIdImpl); assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId()); assertEquals(lastMsgId.getEntryId(), messageId.getEntryId()); reader.close(); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java new file mode 100644 index 00000000000..b9f7b3f5e6f --- /dev/null +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.cli; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.mockito.stubbing.Answer; +import org.testng.Assert; + +/** + * An implement of {@link PulsarClientTool} for test, which will publish messages iff there is enough messages + * in the batch. + */ +public class PulsarClientToolForceBatchNum extends PulsarClientTool{ + private final String topic; + private final int batchNum; + + /** + * + * @param properties properties + * @param topic topic + * @param batchNum iff there is batchNum messages in the batch, the producer will flush and send. + */ + public PulsarClientToolForceBatchNum(Properties properties, String topic, int batchNum) { + super(properties); + this.topic = topic; + this.batchNum = batchNum; + } + + @Override + protected void initJCommander() { + super.initJCommander(); + produceCommand = new CmdProduce() { + @Override + public void updateConfig(ClientBuilder newBuilder, Authentication authentication, String serviceURL) { + try { + super.updateConfig(mockClientBuilder(newBuilder), authentication, serviceURL); + } catch (Exception e) { + Assert.fail("update config fail " + e.getMessage()); + } + } + }; + jcommander.addCommand("produce", produceCommand); + } + + private ClientBuilder mockClientBuilder(ClientBuilder newBuilder) throws Exception { + PulsarClientImpl client = (PulsarClientImpl) newBuilder.build(); + ProducerBuilder<byte[]> producerBuilder = client.newProducer() + .batchingMaxBytes(Integer.MAX_VALUE) + .batchingMaxMessages(batchNum) + .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS) + .topic(topic); + Producer<byte[]> producer = producerBuilder.create(); + + PulsarClientImpl mockClient = spy(client); + ProducerBuilder<byte[]> mockProducerBuilder = spy(producerBuilder); + Producer<byte[]> mockProducer = spy(producer); + ClientBuilder mockClientBuilder = spy(newBuilder); + + doAnswer((Answer<TypedMessageBuilder>) invocation -> { + TypedMessageBuilder typedMessageBuilder = spy((TypedMessageBuilder) invocation.callRealMethod()); + doAnswer((Answer<MessageId>) invocation1 -> { + TypedMessageBuilder mock = ((TypedMessageBuilder) invocation1.getMock()); + // using sendAsync() to replace send() + mock.sendAsync(); + return null; + }).when(typedMessageBuilder).send(); + return typedMessageBuilder; + }).when(mockProducer).newMessage(); + + doReturn(mockProducer).when(mockProducerBuilder).create(); + doReturn(mockProducerBuilder).when(mockClient).newProducer(any(Schema.class)); + doReturn(mockClient).when(mockClientBuilder).build(); + return mockClientBuilder; + } +} diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java index 2fae6330843..b1aa0835756 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java @@ -244,16 +244,19 @@ public class PulsarClientToolTest extends BrokerTestBase { properties.setProperty("useTls", "false"); final String topicName = getTopicWithRandomSuffix("disable-batching"); - final int numberOfMessages = 5; + // `numberOfMessages` should be an even number, because we set `batchNum` as 2, make sure batch and non batch + // messages in the same batch + final int numberOfMessages = 6; + final int batchNum = 2; @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe(); - PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties); + PulsarClientTool pulsarClientTool1 = new PulsarClientToolForceBatchNum(properties, topicName, batchNum); String[] args1 = {"produce", "-m", "batched", "-n", Integer.toString(numberOfMessages), topicName}; Assert.assertEquals(pulsarClientTool1.run(args1), 0); - PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties); + PulsarClientTool pulsarClientTool2 = new PulsarClientToolForceBatchNum(properties, topicName, batchNum); String[] args2 = {"produce", "-m", "non-batched", "-n", Integer.toString(numberOfMessages), "-db", topicName}; Assert.assertEquals(pulsarClientTool2.run(args2), 0); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 43c229d6e81..2d91aafb7e3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -139,8 +139,12 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { MessageImpl<?> msg = messages.get(i); msg.getDataBuffer().markReaderIndex(); try { - batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload( + if (n == 1) { + batchedMessageMetadataAndPayload.writeBytes(msg.getDataBuffer()); + } else { + batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload( msg.getMessageBuilder(), msg.getDataBuffer(), batchedMessageMetadataAndPayload); + } } catch (Throwable th) { // serializing batch message can corrupt the index of message and batch-message. Reset the index so, // next iteration doesn't send corrupt message to broker. @@ -213,6 +217,38 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { @Override public OpSendMsg createOpSendMsg() throws IOException { + if (messages.size() == 1) { + messageMetadata.clear(); + messageMetadata.copyFrom(messages.get(0).getMessageBuilder()); + ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); + ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(), + 1, messageMetadata, encryptedPayload); + final OpSendMsg op; + + // Shouldn't call create(MessageImpl<?> msg, ByteBufPair cmd, long sequenceId, SendCallback callback), + // otherwise it will bring message out of order problem. + // Because when invoke `ProducerImpl.processOpSendMsg` on flush, + // if `op.msg != null && isBatchMessagingEnabled()` checks true, it will call `batchMessageAndSend` to flush + // messageContainers before publishing this one-batch message. + op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(), firstCallback); + + // NumMessagesInBatch and BatchSizeByte will not be serialized to the binary cmd. It's just useful for the + // ProducerStats + op.setNumMessagesInBatch(1); + op.setBatchSizeByte(encryptedPayload.readableBytes()); + + // handle mgs size check as non-batched in `ProducerImpl.isMessageSizeExceeded` + if (op.getMessageHeaderAndPayloadSize() > ClientCnx.getMaxMessageSize()) { + producer.semaphoreRelease(1); + producer.client.getMemoryLimitController().releaseMemory(messages.get(0).getUncompressedSize()); + discard(new PulsarClientException.InvalidMessageException( + "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes")); + return null; + } + lowestSequenceId = -1L; + return op; + } + ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { producer.semaphoreRelease(messages.size()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index be411cfa4bb..6f7d7e6a148 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1488,6 +1488,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne void setMessageId(long ledgerId, long entryId, int partitionIndex) { if (msg != null) { msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex)); + } else if (msgs.size() == 1) { + // If there is only one message in batch, the producer will publish messages like non-batch + msgs.get(0).setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex)); } else { for (int batchIndex = 0; batchIndex < msgs.size(); batchIndex++) { msgs.get(batchIndex)