This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new baf155f  [ClientAPI]Fix hasMessageAvailable() (#6362)
baf155f is described below

commit baf155f4a41de959145841e4cd922e89363e9292
Author: Yijie Shen <henry.yijies...@gmail.com>
AuthorDate: Tue Mar 3 15:04:49 2020 +0800

    [ClientAPI]Fix hasMessageAvailable() (#6362)
    
    Fixes #6333
    
    Previously, `hasMoreMessages` is test against:
    ```
    return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0
                    && incomingMessages.size() > 0;
    ```
    However, the `incomingMessages` could be 0 when the consumer/reader has 
just started and hasn't received any messages yet.
    
    In this PR, the last entry is retrieved and decoded to get message 
metadata. for the batchIndex field population.
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 86 ++++++++++++++++---
 .../pulsar/compaction/TwoPhaseCompactor.java       |  6 +-
 .../apache/pulsar/client/api/TopicReaderTest.java  | 75 ++++++++++++++++-
 .../org/apache/pulsar/client/impl/ReaderTest.java  |  4 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 98 ++++++++++++++++------
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |  2 +-
 .../pulsar/common/api/raw/RawMessageImpl.java      |  3 +
 7 files changed, 233 insertions(+), 41 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bef63fc..4497c97 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -46,7 +46,11 @@ import java.util.stream.Collectors;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.commons.lang3.StringUtils;
@@ -59,6 +63,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyExcep
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.web.RestException;
@@ -1396,22 +1401,83 @@ public class ServerCnx extends PulsarHandler {
             Topic topic = consumer.getSubscription().getTopic();
             Position position = topic.getLastMessageId();
             int partitionIndex = TopicName.getPartitionIndex(topic.getName());
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex 
{}", remoteAddress,
-                    topic.getName(), consumer.getSubscription().getName(), 
position, partitionIndex);
-            }
-            MessageIdData messageId = MessageIdData.newBuilder()
-                .setLedgerId(((PositionImpl)position).getLedgerId())
-                .setEntryId(((PositionImpl)position).getEntryId())
-                .setPartition(partitionIndex)
-                .build();
 
-            ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
messageId));
+            getLargestBatchIndexWhenPossible(
+                    topic,
+                    (PositionImpl) position,
+                    partitionIndex,
+                    requestId,
+                    consumer.getSubscription().getName());
+
         } else {
             
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), 
ServerError.MetadataError, "Consumer not found"));
         }
     }
 
+    private void getLargestBatchIndexWhenPossible(
+            Topic topic,
+            PositionImpl position,
+            int partitionIndex,
+            long requestId,
+            String subscriptionName) {
+
+        PersistentTopic persistentTopic = (PersistentTopic) topic;
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+
+        // If it's not pointing to a valid entry, respond messageId of the 
current position.
+        if (position.getEntryId() == -1) {
+            MessageIdData messageId = MessageIdData.newBuilder()
+                    .setLedgerId(position.getLedgerId())
+                    .setEntryId(position.getEntryId())
+                    .setPartition(partitionIndex).build();
+
+            ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
messageId));
+        }
+
+        // For a valid position, we read the entry out and parse the batch 
size from its metadata.
+        CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
+        ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                entryFuture.complete(entry);
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                entryFuture.completeExceptionally(exception);
+            }
+        }, null);
+
+        CompletableFuture<Integer> batchSizeFuture = 
entryFuture.thenApply(entry -> {
+            MessageMetadata metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+            int batchSize = metadata.getNumMessagesInBatch();
+            entry.release();
+            return batchSize;
+        });
+
+        batchSizeFuture.whenComplete((batchSize, e) -> {
+            if (e != null) {
+                ctx.writeAndFlush(Commands.newError(
+                        requestId, ServerError.MetadataError, "Failed to get 
batch size for entry " + e.getMessage()));
+            } else {
+                int largestBatchIndex = batchSize > 1 ? batchSize - 1 : -1;
+
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}][{}] Get LastMessageId {} 
partitionIndex {}", remoteAddress,
+                            topic.getName(), subscriptionName, position, 
partitionIndex);
+                }
+
+                MessageIdData messageId = MessageIdData.newBuilder()
+                        .setLedgerId(position.getLedgerId())
+                        .setEntryId(position.getEntryId())
+                        .setPartition(partitionIndex)
+                        .setBatchIndex(largestBatchIndex).build();
+
+                
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
+            }
+        });
+    }
+
     @Override
     protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace 
commandGetTopicsOfNamespace) {
         final long requestId = commandGetTopicsOfNamespace.getRequestId();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 06afe93..95f6f1a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.RawBatchConverter;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -95,7 +96,10 @@ public class TwoPhaseCompactor extends Compactor {
                     } else {
                         log.info("Commencing phase one of compaction for {}, 
reading to {}",
                                  reader.getTopic(), lastMessageId);
-                        phaseOneLoop(reader, Optional.empty(), 
Optional.empty(), lastMessageId, latestForKey,
+                        // Each entry is processed as a whole, discard the 
batchIndex part deliberately.
+                        MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
+                        MessageIdImpl lastEntryMessageId = new 
MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(), 
lastImpl.getPartitionIndex());
+                        phaseOneLoop(reader, Optional.empty(), 
Optional.empty(), lastEntryMessageId, latestForKey,
                                 loopPromise);
                     }
                 });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 7eda446..552f69d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -91,6 +91,17 @@ public class TopicReaderTest extends ProducerConsumerBase {
         };
     }
 
+    @DataProvider
+    public static Object[][] variationsForHasMessageAvailable() {
+        return new Object[][] {
+                // batching / start-inclusive
+                {true,  true},
+                {true,  false},
+                {false, true},
+                {false, false},
+        };
+    }
+
     @Test
     public void testSimpleReader() throws Exception {
         Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader")
@@ -531,6 +542,68 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     }
 
+    @Test(dataProvider = "variationsForHasMessageAvailable")
+    public void testHasMessageAvailable(boolean enableBatch, boolean 
startInclusive) throws Exception {
+        final String topicName = 
"persistent://my-property/my-ns/HasMessageAvailable";
+        final int numOfMessage = 100;
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic(topicName);
+
+        if (enableBatch) {
+            producerBuilder
+                    .enableBatching(true)
+                    .batchingMaxMessages(10);
+        } else {
+            producerBuilder
+                    .enableBatching(false);
+        }
+
+        Producer<byte[]> producer = producerBuilder.create();
+
+        CountDownLatch latch = new CountDownLatch(numOfMessage);
+
+        List<MessageId> allIds = Collections.synchronizedList(new 
ArrayList<>());
+
+        for (int i = 0; i < numOfMessage; i++) {
+            producer.sendAsync(String.format("msg num %d", 
i).getBytes()).whenComplete((mid, e) -> {
+                if (e != null) {
+                    Assert.fail();
+                } else {
+                    allIds.add(mid);
+                }
+                latch.countDown();
+            });
+        }
+
+        latch.await();
+
+        allIds.sort(null); // make sure the largest mid appears at last.
+
+        for (MessageId id : allIds) {
+            Reader<byte[]> reader;
+
+            if (startInclusive) {
+                reader = pulsarClient.newReader().topic(topicName)
+                        .startMessageId(id).startMessageIdInclusive().create();
+            } else {
+                reader = pulsarClient.newReader().topic(topicName)
+                        .startMessageId(id).create();
+            }
+
+            if (startInclusive) {
+                assertTrue(reader.hasMessageAvailable());
+            } else if (id != allIds.get(allIds.size() - 1)) {
+                assertTrue(reader.hasMessageAvailable());
+            } else {
+                assertFalse(reader.hasMessageAvailable());
+            }
+            reader.close();
+        }
+
+        producer.close();
+    }
+
     @Test
     public void testReaderNonDurableIsAbleToSeekRelativeTime() throws 
Exception {
         final int numOfMessage = 10;
@@ -794,7 +867,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
                 .batchingMaxMessages(10)
                 .create();
 
-        CountDownLatch latch = new CountDownLatch(100);
+        CountDownLatch latch = new CountDownLatch(numOfMessage);
 
         List<MessageId> allIds = Collections.synchronizedList(new 
ArrayList<>());
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 5f97a58..b75cfce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -145,7 +145,9 @@ public class ReaderTest extends MockedPulsarServiceBaseTest 
{
         while (reader.hasMessageAvailable()) {
             Assert.assertTrue(keys.remove(reader.readNext().getKey()));
         }
-        Assert.assertTrue(keys.isEmpty());
+        // start from latest with start message inclusive should only read the 
last message in batch
+        Assert.assertTrue(keys.size() == 9);
+        Assert.assertFalse(keys.contains("key9"));
         Assert.assertFalse(reader.hasMessageAvailable());
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 9431496..ce4ee1e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -104,7 +104,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     @SuppressWarnings("unused")
     private volatile int availablePermits = 0;
 
-    protected volatile MessageId lastDequeuedMessage = MessageId.earliest;
+    protected volatile MessageId lastDequeuedMessageId = MessageId.earliest;
     private volatile MessageId lastMessageIdInBroker = MessageId.earliest;
 
     private long subscribeTimeout;
@@ -188,7 +188,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = conf.getSubscriptionMode();
         this.startMessageId = startMessageId != null ? new 
BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
-        this.lastDequeuedMessage = startMessageId == null ? MessageId.earliest 
: startMessageId;
         this.initialStartMessageId = this.startMessageId;
         this.startMessageRollbackDurationInSec = 
startMessageRollbackDurationInSec;
         AVAILABLE_PERMITS_UPDATER.set(this, 0);
@@ -677,7 +676,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (duringSeek.compareAndSet(true, false)) {
             return seekMessageId;
         } else if (subscriptionMode == SubscriptionMode.Durable) {
-            return null;
+            return startMessageId;
         }
 
         if (!currentMessageQueue.isEmpty()) {
@@ -695,10 +694,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             }
 
             return previousMessage;
-        } else if (!lastDequeuedMessage.equals(MessageId.earliest)) {
+        } else if (!lastDequeuedMessageId.equals(MessageId.earliest)) {
             // If the queue was empty we need to restart from the message just 
after the last one that has been dequeued
             // in the past
-            return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessage);
+            return new BatchMessageIdImpl((MessageIdImpl) 
lastDequeuedMessageId);
         } else {
             // No message was received or dequeued by this consumer. Next 
message would still be the startMessageId
             return startMessageId;
@@ -1118,7 +1117,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     protected synchronized void messageProcessed(Message<?> msg) {
         ClientCnx currentCnx = cnx();
         ClientCnx msgCnx = ((MessageImpl<?>) msg).getCnx();
-        lastDequeuedMessage = msg.getMessageId();
+        lastDequeuedMessageId = msg.getMessageId();
 
         if (msgCnx != currentCnx) {
             // The processed message did belong to the old queue that was 
cleared after reconnection.
@@ -1493,6 +1492,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
             seekMessageId = new BatchMessageIdImpl((MessageIdImpl) 
MessageId.earliest);
             duringSeek.set(true);
+            lastDequeuedMessageId = MessageId.earliest;
 
             incomingMessages.clear();
             INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
@@ -1539,6 +1539,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
             seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId);
             duringSeek.set(true);
+            lastDequeuedMessageId = MessageId.earliest;
 
             incomingMessages.clear();
             INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
@@ -1555,17 +1556,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     public boolean hasMessageAvailable() throws PulsarClientException {
-        // we need to seek to the last position then the last message can be 
received when the resetIncludeHead
-        // specified.
-        if (lastDequeuedMessage == MessageId.latest && resetIncludeHead) {
-            lastDequeuedMessage = getLastMessageId();
-            seek(lastDequeuedMessage);
-        }
         try {
-            if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
-                return true;
-            }
-
             return hasMessageAvailableAsync().get();
         } catch (Exception e) {
             throw PulsarClientException.unwrap(e);
@@ -1575,12 +1566,56 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     public CompletableFuture<Boolean> hasMessageAvailableAsync() {
         final CompletableFuture<Boolean> booleanFuture = new 
CompletableFuture<>();
 
-        if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
-            booleanFuture.complete(true);
+        // we haven't read yet. use startMessageId for comparison
+        if (lastDequeuedMessageId == MessageId.earliest) {
+            // if we are starting from latest, we should seek to the actual 
last message first.
+            // allow the last one to be read when read head inclusively.
+            if (startMessageId.getLedgerId() == Long.MAX_VALUE &&
+                    startMessageId.getEntryId() == Long.MAX_VALUE &&
+                    startMessageId.partitionIndex == -1) {
+
+                getLastMessageIdAsync()
+                        .thenCompose(this::seekAsync)
+                        .whenComplete((ignore, e) -> {
+                            if (e != null) {
+                                log.error("[{}][{}] Failed getLastMessageId 
command", topic, subscription);
+                                
booleanFuture.completeExceptionally(e.getCause());
+                            } else {
+                                booleanFuture.complete(resetIncludeHead);
+                            }
+                        });
+
+                return booleanFuture;
+            }
+
+            if (hasMoreMessages(lastMessageIdInBroker, startMessageId, 
resetIncludeHead)) {
+                booleanFuture.complete(true);
+                return booleanFuture;
+            }
+
+            getLastMessageIdAsync().thenAccept(messageId -> {
+                lastMessageIdInBroker = messageId;
+                if (hasMoreMessages(lastMessageIdInBroker, startMessageId, 
resetIncludeHead)) {
+                    booleanFuture.complete(true);
+                } else {
+                    booleanFuture.complete(false);
+                }
+            }).exceptionally(e -> {
+                log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+                booleanFuture.completeExceptionally(e.getCause());
+                return null;
+            });
+
         } else {
+            // read before, use lastDequeueMessage for comparison
+            if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, 
false)) {
+                booleanFuture.complete(true);
+                return booleanFuture;
+            }
+
             getLastMessageIdAsync().thenAccept(messageId -> {
                 lastMessageIdInBroker = messageId;
-                if (hasMoreMessages(lastMessageIdInBroker, 
lastDequeuedMessage)) {
+                if (hasMoreMessages(lastMessageIdInBroker, 
lastDequeuedMessageId, false)) {
                     booleanFuture.complete(true);
                 } else {
                     booleanFuture.complete(false);
@@ -1591,18 +1626,22 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 return null;
             });
         }
+
         return booleanFuture;
     }
 
-    private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId 
lastDequeuedMessage) {
-        if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+    private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId 
messageId, boolean inclusive) {
+        if (inclusive && lastMessageIdInBroker.compareTo(messageId) >= 0 &&
                 ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
             return true;
-        } else {
-            // Make sure batching message can be read completely.
-            return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0
-                && incomingMessages.size() > 0;
         }
+
+        if (!inclusive && lastMessageIdInBroker.compareTo(messageId) > 0 &&
+                ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+            return true;
+        }
+
+        return false;
     }
 
     @Override
@@ -1647,8 +1686,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             cnx.sendGetLastMessageId(getLastIdCmd, 
requestId).thenAccept((result) -> {
                 log.info("[{}][{}] Successfully getLastMessageId {}:{}",
                     topic, subscription, result.getLedgerId(), 
result.getEntryId());
-                future.complete(new MessageIdImpl(result.getLedgerId(),
-                    result.getEntryId(), result.getPartition()));
+                if (result.getBatchIndex() < 0) {
+                    future.complete(new MessageIdImpl(result.getLedgerId(),
+                            result.getEntryId(), result.getPartition()));
+                } else {
+                    future.complete(new 
BatchMessageIdImpl(result.getLedgerId(),
+                            result.getEntryId(), result.getPartition(), 
result.getBatchIndex()));
+                }
             }).exceptionally(e -> {
                 log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
                 future.completeExceptionally(
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 94c8dd3..a0de070 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -98,7 +98,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> 
{
             }
             do {
                 message = incomingMessages.take();
-                lastDequeuedMessage = message.getMessageId();
+                lastDequeuedMessageId = message.getMessageId();
                 ClientCnx msgCnx = ((MessageImpl<?>) message).getCnx();
                 // synchronized need to prevent race between connectionOpened 
and the check "msgCnx == cnx()"
                 synchronized (this) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index 335bf96..371f47f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -162,4 +162,7 @@ public class RawMessageImpl implements RawMessage {
         return msgMetadata.get().getPartitionKeyB64Encoded();
     }
 
+    public int getBatchSize() {
+        return msgMetadata.get().getNumMessagesInBatch();
+    }
 }

Reply via email to