merlimat closed pull request #1066: Issue 937: add CommandGetLastMessageId to 
make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index e13664c27..9149bb9f9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -334,4 +334,11 @@
      * @param config
      */
     void setConfig(ManagedLedgerConfig config);
+
+    /**
+     * Gets last confirmed entry of the managed ledger.
+     *
+     * @return the last confirmed entry id
+     */
+    Position getLastConfirmedEntry();
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 80a0bbed8..89d3476ea 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1062,7 +1062,7 @@ private void closeAllCursors(CloseCallback callback, 
final Object ctx) {
         Futures.waitForAll(futures).thenRun(() -> {
             callback.closeComplete(ctx);
         }).exceptionally(exception -> {
-            
callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx);
+            
callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()),
 ctx);
             return null;
         });
     }
@@ -1282,7 +1282,7 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
             }).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position 
{} - {}", name, opReadEntry.readPosition,
                         ex.getMessage());
-                
opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), 
opReadEntry.ctx);
+                
opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
 opReadEntry.ctx);
                 return null;
             });
         }
@@ -1351,7 +1351,7 @@ void asyncReadEntry(PositionImpl position, 
ReadEntryCallback callback, Object ct
                 entryCache.asyncReadEntry(ledger, position, callback, ctx);
             }).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position 
{} - {}", name, position, ex.getMessage());
-                
callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx);
+                
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
 ctx);
                 return null;
             });
         }
@@ -2173,7 +2173,8 @@ public int getPendingAddEntriesCount() {
         return pendingAddEntries.size();
     }
 
-    public PositionImpl getLastConfirmedEntry() {
+    @Override
+    public Position getLastConfirmedEntry() {
         return lastConfirmedEntry;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0b8d51280..cb473c875 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -31,10 +31,8 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
-
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
@@ -59,6 +57,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
@@ -110,7 +109,7 @@
     private String originalPrincipal = null;
     private Set<String> proxyRoles;
     private boolean authenticateOriginalAuthData;
-    
+
     enum State {
         Start, Connected, Failed
     }
@@ -192,8 +191,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) throws E
     }
 
     /*
-     * If authentication and authorization is enabled and if the authRole is 
one of proxyRoles we want to enforce 
-     * - the originalPrincipal is given while connecting 
+     * If authentication and authorization is enabled and if the authRole is 
one of proxyRoles we want to enforce
+     * - the originalPrincipal is given while connecting
      * - originalPrincipal is not blank
      * - originalPrincipal is not a proxy principal
      */
@@ -218,7 +217,7 @@ protected void handleLookup(CommandLookupTopic lookup) {
         if (topicName == null) {
             return;
         }
-        
+
         String originalPrincipal = null;
         if (authenticateOriginalAuthData && lookup.hasOriginalAuthData()) {
             originalPrincipal = validateOriginalPrincipal(
@@ -233,9 +232,9 @@ protected void handleLookup(CommandLookupTopic lookup) {
         } else {
             originalPrincipal = lookup.hasOriginalPrincipal() ? 
lookup.getOriginalPrincipal() : this.originalPrincipal;
         }
-        
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
-        if (lookupSemaphore.tryAcquire()) {            
+        if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {
                 final String msg = "Valid Proxy Client role should be provided 
for lookup ";
                 log.warn("[{}] {} with role {} and proxyClientAuthRole {} on 
topic {}", remoteAddress, msg, authRole,
@@ -319,7 +318,7 @@ protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
         } else {
             originalPrincipal = partitionMetadata.hasOriginalPrincipal() ? 
partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
         }
-        
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {
@@ -441,7 +440,7 @@ protected void handleConsumerStats(CommandConsumerStats 
commandConsumerStats) {
 
         return commandConsumerStatsResponseBuilder;
     }
-    
+
     private String validateOriginalPrincipal(String originalAuthData, String 
originalAuthMethod, String originalPrincipal, Long requestId, 
GeneratedMessageLite request) {
         ChannelHandler sslHandler = 
ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
         SSLSession sslSession = null;
@@ -461,7 +460,7 @@ private String validateOriginalPrincipal(String 
originalAuthData, String origina
             return null;
         }
     }
-    
+
     private String getOriginalPrincipal(String originalAuthData, String 
originalAuthMethod, String originalPrincipal,
             SSLSession sslSession) throws AuthenticationException {
         if (authenticateOriginalAuthData) {
@@ -532,7 +531,7 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
         DestinationName topicName = validateTopicName(subscribe.getTopic(), 
requestId, subscribe);
         if (topicName == null) {
             return;
-        }  
+        }
 
         if (invalidOriginalPrincipal(originalPrincipal)) {
             final String msg = "Valid Proxy Client role should be provided 
while subscribing ";
@@ -1104,6 +1103,35 @@ protected void handleCloseConsumer(CommandCloseConsumer 
closeConsumer) {
         }
     }
 
+    @Override
+    protected void handleGetLastMessageId(CommandGetLastMessageId 
getLastMessageId) {
+        checkArgument(state == State.Connected);
+
+        CompletableFuture<Consumer> consumerFuture = 
consumers.get(getLastMessageId.getConsumerId());
+
+        if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
+            Consumer consumer = consumerFuture.getNow(null);
+            long requestId = getLastMessageId.getRequestId();
+
+            Topic topic = consumer.getSubscription().getTopic();
+            Position position = topic.getLastMessageId();
+            int partitionIndex = 
DestinationName.getPartitionIndex(topic.getName());
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex 
{}", remoteAddress,
+                    topic.getName(), consumer.getSubscription().getName(), 
position, partitionIndex);
+            }
+            MessageIdData messageId = MessageIdData.newBuilder()
+                .setLedgerId(((PositionImpl)position).getLedgerId())
+                .setEntryId(((PositionImpl)position).getEntryId())
+                .setPartition(partitionIndex)
+                .build();
+
+            ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
messageId));
+        } else {
+            
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), 
ServerError.MetadataError, "Consumer not found"));
+        }
+    }
+
     @Override
     protected boolean isHandshakeCompleted() {
         return state == State.Connected;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 7a426b7e4..80aed778b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -21,6 +21,7 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
@@ -122,4 +123,6 @@ void updateRates(NamespaceStats nsStats, 
NamespaceBundleStats currentBundleStats
     PersistentTopicStats getStats();
 
     PersistentTopicInternalStats getInternalStats();
+
+    Position getLastMessageId();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 44a9e1443..588c494d2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -34,6 +34,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -909,6 +910,11 @@ public boolean isEncryptionRequired() {
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public Position getLastMessageId() {
+        throw new UnsupportedOperationException("getLastMessageId is not 
supported on non-persistent topic");
+    }
+
     public void markBatchMessagePublished() {
         this.hasBatchMessagePublished = true;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 20ae52726..a17e2de54 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1536,5 +1536,10 @@ public long getLastPublishedSequenceId(String 
producerName) {
         return messageDeduplication.getLastPublishedSequenceId(producerName);
     }
 
+    @Override
+    public Position getLastMessageId() {
+        return ledger.getLastConfirmedEntry();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopic.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index df36b8e3c..e8f635fb7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -19,7 +19,11 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -28,8 +32,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.policies.data.PersistentTopicStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,8 +42,6 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Sets;
-
 public class TopicReaderTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(TopicReaderTest.class);
 
@@ -359,4 +361,103 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> keyMe
         reader.close();
         log.info("-- Exiting {} test --", methodName);
     }
+
+
+    @Test
+    public void testSimpleReaderReachEndofTopic() throws Exception {
+        ReaderConfiguration conf = new ReaderConfiguration();
+        Reader reader = 
pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", 
MessageId.earliest,
+            conf);
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", 
producerConf);
+
+        // no data write, should return false
+        assertFalse(reader.hasMessageAvailable());
+
+        // produce message 0 -- 99
+        for (int i = 0; i < 100; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        MessageImpl msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        int index = 0;
+
+        // read message till end.
+        while (reader.hasMessageAvailable()) {
+            msg = (MessageImpl) reader.readNext(1, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + (index ++);
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+
+        assertEquals(index, 100);
+        // readNext should return null, after reach the end of topic.
+        assertNull(reader.readNext(1, TimeUnit.SECONDS));
+
+        // produce message again.
+        for (int i = 100; i < 200; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // read message till end again.
+        while (reader.hasMessageAvailable()) {
+            msg = (MessageImpl) reader.readNext(1, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + (index ++);
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+
+        assertEquals(index, 200);
+        // readNext should return null, after reach the end of topic.
+        assertNull(reader.readNext(1, TimeUnit.SECONDS));
+
+        producer.close();
+    }
+
+    @Test
+    public void testReaderReachEndofTopicOnMessageWithBatches() throws 
Exception {
+        Reader reader = pulsarClient.createReader(
+            
"persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches",
 MessageId.earliest,
+            new ReaderConfiguration());
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(true);
+        producerConf.setBatchingMaxPublishDelay(100, TimeUnit.MILLISECONDS);
+        Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches",
 producerConf);
+
+        // no data write, should return false
+        assertFalse(reader.hasMessageAvailable());
+
+        for (int i = 0; i < 100; i++) {
+            String message = "my-message-" + i;
+            producer.sendAsync(message.getBytes());
+        }
+
+        // Write one sync message to ensure everything before got persistend
+        producer.send("my-message-10".getBytes());
+
+        MessageId lastMessageId = null;
+        int index = 0;
+        assertTrue(reader.hasMessageAvailable());
+
+        if (reader.hasMessageAvailable()) {
+            Message msg = reader.readNext();
+            lastMessageId = msg.getMessageId();
+            assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class);
+
+            while (msg != null) {
+                index++;
+                msg = reader.readNext(100, TimeUnit.MILLISECONDS);
+            }
+            assertEquals(index, 101);
+        }
+
+        assertFalse(reader.hasMessageAvailable());
+        producer.close();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java
index 4b89470d2..d29b23838 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -62,4 +62,14 @@
      * Return true if the topic was terminated and this reader has reached the 
end of the topic
      */
     boolean hasReachedEndOfTopic();
+
+    /**
+     * Check if there is any message available to read from the current 
position.
+     */
+    boolean hasMessageAvailable() throws PulsarClientException;
+
+    /**
+     * Asynchronously Check if there is message that has been published 
successfully to the broker in the topic.
+     */
+    CompletableFuture<Boolean> hasMessageAvailableAsync();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 1064654d0..58cdedeb6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -44,6 +44,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse;
@@ -52,6 +53,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
@@ -76,6 +78,8 @@
             16, 1);
     private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> 
pendingLookupRequests = new ConcurrentLongHashMap<>(
             16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> 
pendingGetLastMessageIdRequests = new ConcurrentLongHashMap<>(
+        16, 1);
     private final ConcurrentLongHashMap<ProducerImpl> producers = new 
ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<ConsumerImpl> consumers = new 
ConcurrentLongHashMap<>(16, 1);
 
@@ -158,6 +162,7 @@ public void channelInactive(ChannelHandlerContext ctx) 
throws Exception {
         // Fail out all the pending ops
         pendingRequests.forEach((key, future) -> 
future.completeExceptionally(e));
         pendingLookupRequests.forEach((key, future) -> 
future.completeExceptionally(e));
+        pendingGetLastMessageIdRequests.forEach((key, future) -> 
future.completeExceptionally(e));
 
         // Notify all attached producers/consumers so they have a chance to 
reconnect
         producers.forEach((id, producer) -> producer.connectionClosed(this));
@@ -263,6 +268,22 @@ protected void handleSuccess(CommandSuccess success) {
         }
     }
 
+    @Override
+    protected void 
handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse success) {
+        checkArgument(state == State.Ready);
+
+        if (log.isDebugEnabled()) {
+            log.debug("{} Received success GetLastMessageId response from 
server: {}", ctx.channel(), success.getRequestId());
+        }
+        long requestId = success.getRequestId();
+        CompletableFuture<MessageIdData> requestFuture = 
pendingGetLastMessageIdRequests.remove(requestId);
+        if (requestFuture != null) {
+            requestFuture.complete(success.getLastMessageId());
+        } else {
+            log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), success.getRequestId());
+        }
+    }
+
     @Override
     protected void handleProducerSuccess(CommandProducerSuccess success) {
         checkArgument(state == State.Ready);
@@ -511,6 +532,22 @@ SocketAddress serverAddrees() {
         return future;
     }
 
+    public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf 
request, long requestId) {
+        CompletableFuture<MessageIdData> future = new CompletableFuture<>();
+
+        pendingGetLastMessageIdRequests.put(requestId, future);
+
+        ctx.writeAndFlush(request).addListener(writeFuture -> {
+            if (!writeFuture.isSuccess()) {
+                log.warn("{} Failed to send GetLastMessageId request to 
broker: {}", ctx.channel(), writeFuture.cause().getMessage());
+                pendingGetLastMessageIdRequests.remove(requestId);
+                future.completeExceptionally(writeFuture.cause());
+            }
+        });
+
+        return future;
+    }
+
     /**
      * check serverError and take appropriate action
      * <ul>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b74bb139e..a43689971 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,19 +25,33 @@
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
+import com.google.common.collect.Iterables;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Timeout;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@@ -60,13 +74,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Iterables;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Timeout;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
 public class ConsumerImpl extends ConsumerBase {
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
 
@@ -78,7 +85,8 @@
             .newUpdater(ConsumerImpl.class, "availablePermits");
     private volatile int availablePermits = 0;
 
-    private MessageIdImpl lastDequeuedMessage;
+    private MessageId lastDequeuedMessage = MessageId.earliest;
+    private MessageId lastMessageIdInBroker = MessageId.earliest;
 
     private long subscribeTimeout;
     private final int partitionIndex;
@@ -278,7 +286,7 @@ private Message fetchSingleMessageFromBroker() throws 
PulsarClientException {
             }
             do {
                 message = incomingMessages.take();
-                lastDequeuedMessage = (MessageIdImpl) message.getMessageId();
+                lastDequeuedMessage = message.getMessageId();
                 ClientCnx msgCnx = ((MessageImpl) message).getCnx();
                 // synchronized need to prevent race between connectionOpened 
and the check "msgCnx == cnx()"
                 synchronized (ConsumerImpl.this) {
@@ -639,10 +647,10 @@ private BatchMessageIdImpl clearReceiverQueue() {
             }
 
             return previousMessage;
-        } else if (lastDequeuedMessage != null) {
+        } else if (!lastDequeuedMessage.equals(MessageId.earliest)) {
             // If the queue was empty we need to restart from the message just 
after the last one that has been dequeued
             // in the past
-            return new BatchMessageIdImpl(lastDequeuedMessage);
+            return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessage);
         } else {
             // No message was received or dequeued by this consumer. Next 
message would still be the startMessageId
             return startMessageId;
@@ -964,7 +972,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata 
msgMetadata, ByteBuf unc
     protected synchronized void messageProcessed(Message msg) {
         ClientCnx currentCnx = cnx();
         ClientCnx msgCnx = ((MessageImpl) msg).getCnx();
-        lastDequeuedMessage = (MessageIdImpl) msg.getMessageId();
+        lastDequeuedMessage = msg.getMessageId();
 
         if (msgCnx != currentCnx) {
             // The processed message did belong to the old queue that was 
cleared after reconnection.
@@ -1248,6 +1256,101 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
         return seekFuture;
     }
 
+    public boolean hasMessageAvailable() throws PulsarClientException {
+        try {
+            if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+                ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+                return true;
+            }
+
+            return hasMessageAvailableAsync().get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+        final CompletableFuture<Boolean> booleanFuture = new 
CompletableFuture<>();
+
+        if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+            ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+            booleanFuture.complete(true);
+        } else {
+            getLastMessageIdAsync().thenAccept(messageId -> {
+                lastMessageIdInBroker = messageId;
+                if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+                    ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+                    booleanFuture.complete(true);
+                } else {
+                    booleanFuture.complete(false);
+                }
+            }).exceptionally(e -> {
+                log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+                booleanFuture.completeExceptionally(e.getCause());
+                return null;
+            });
+        }
+        return booleanFuture;
+    }
+
+    private CompletableFuture<MessageId> getLastMessageIdAsync() {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil
+                .failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+        }
+
+        AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+        Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
+            opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
+            0 , TimeUnit.MILLISECONDS);
+        CompletableFuture<MessageId> getLastMessageIdFuture = new 
CompletableFuture<>();
+
+        internalGetLastMessageIdAsync(backoff, opTimeoutMs, 
getLastMessageIdFuture);
+        return getLastMessageIdFuture;
+    }
+
+    private void internalGetLastMessageIdAsync(final Backoff backoff,
+                                               final AtomicLong remainingTime,
+                                               CompletableFuture<MessageId> 
future) {
+        ClientCnx cnx = cnx();
+        if (isConnected() && cnx != null) {
+            if 
(!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion()))
 {
+                future.completeExceptionally(new PulsarClientException
+                    .NotSupportedException("GetLastMessageId Not supported for 
ProtocolVersion: " +
+                    cnx.getRemoteEndpointProtocolVersion()));
+            }
+
+            long requestId = client.newRequestId();
+            ByteBuf getLastIdCmd = Commands.newGetLastMessageId(consumerId, 
requestId);
+            log.info("[{}][{}] Get topic last message Id", topic, 
subscription);
+
+            cnx.sendGetLastMessageId(getLastIdCmd, 
requestId).thenAccept((result) -> {
+                log.info("[{}][{}] Successfully getLastMessageId {}:{}",
+                    topic, subscription, result.getLedgerId(), 
result.getEntryId());
+                future.complete(new MessageIdImpl(result.getLedgerId(),
+                    result.getEntryId(), result.getPartition()));
+            }).exceptionally(e -> {
+                log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+                future.completeExceptionally(e.getCause());
+                return null;
+            });
+        } else {
+            long nextDelay = Math.min(backoff.next(), remainingTime.get());
+            if (nextDelay <= 0) {
+                future.completeExceptionally(new PulsarClientException
+                    .TimeoutException("Could not getLastMessageId within 
configured timeout."));
+                return;
+            }
+
+            ((ScheduledExecutorService) listenerExecutor).schedule(() -> {
+                log.warn("[{}] [{}] Could not get connection while 
getLastMessageId -- Will try again in {} ms",
+                    topic, getHandlerName(), nextDelay);
+                remainingTime.addAndGet(-nextDelay);
+                internalGetLastMessageIdAsync(backoff, remainingTime, future);
+            }, nextDelay, TimeUnit.MILLISECONDS);
+        }
+    }
+
     private MessageIdImpl getMessageIdImpl(Message msg) {
         MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId();
         if (messageId instanceof BatchMessageIdImpl) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 5b50ef523..bf0dc2fdc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -23,7 +23,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
@@ -133,4 +132,14 @@ public void close() throws IOException {
         return consumer.closeAsync();
     }
 
+    @Override
+    public boolean hasMessageAvailable() throws PulsarClientException {
+        return consumer.hasMessageAvailable();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+        return consumer.hasMessageAvailableAsync();
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index cb607206a..907c52fe0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -41,7 +41,7 @@ public ExecutorProvider(int numThreads, String 
threadNamePrefix) {
         checkNotNull(threadNamePrefix);
         executors = Lists.newArrayListWithCapacity(numThreads);
         for (int i = 0; i < numThreads; i++) {
-            executors.add(Executors.newSingleThreadExecutor(new 
DefaultThreadFactory(threadNamePrefix)));
+            executors.add(Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory(threadNamePrefix)));
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 4376b013f..2912a737a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -21,11 +21,14 @@
 import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
 import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
 
+import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
@@ -40,6 +43,7 @@
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
@@ -67,12 +71,6 @@
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 
-import com.google.protobuf.ByteString;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
 public class Commands {
 
     public static final short magicCrc32c = 0x0e01;
@@ -123,7 +121,7 @@ public static ByteBuf newConnect(String authMethodName, 
String authData, int pro
         if (originalAuthData != null) {
             connectBuilder.setOriginalAuthData(originalAuthData);
         }
-        
+
         if (originalAuthMethod != null) {
             connectBuilder.setOriginalAuthMethod(originalAuthMethod);
         }
@@ -637,6 +635,29 @@ static ByteBuf newPong() {
         return cmdPong.retainedDuplicate();
     }
 
+    public static ByteBuf newGetLastMessageId(long consumerId, long requestId) 
{
+        CommandGetLastMessageId.Builder cmdBuilder = 
CommandGetLastMessageId.newBuilder();
+        cmdBuilder.setConsumerId(consumerId).setRequestId(requestId);
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_LAST_MESSAGE_ID)
+            .setGetLastMessageId(cmdBuilder.build()));
+        cmdBuilder.recycle();
+        return res;
+    }
+
+    public static ByteBuf newGetLastMessageIdResponse(long requestId, 
MessageIdData messageIdData) {
+        PulsarApi.CommandGetLastMessageIdResponse.Builder response = 
PulsarApi.CommandGetLastMessageIdResponse.newBuilder()
+            .setLastMessageId(messageIdData)
+            .setRequestId(requestId);
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_LAST_MESSAGE_ID_RESPONSE)
+            .setGetLastMessageIdResponse(response.build()));
+        response.recycle();
+        return res;
+    }
+
     private static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) {
         // / Wire format
         // [TOTAL_SIZE] [CMD_SIZE][CMD]
@@ -931,4 +952,8 @@ public static ByteBuf newLookup(String topic, boolean 
authoritative, String orig
         lookupBroker.recycle();
         return res;
     }
+
+    public static boolean peerSupportsGetLastMessageId(int peerVersion) {
+        return peerVersion >= ProtocolVersion.v12.getNumber();
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index e9e113a57..8d2389bf6 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -20,6 +20,7 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
@@ -253,6 +254,18 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
                 handleReachedEndOfTopic(cmd.getReachedEndOfTopic());
                 cmd.getReachedEndOfTopic().recycle();
                 break;
+
+            case GET_LAST_MESSAGE_ID:
+                checkArgument(cmd.hasGetLastMessageId());
+                handleGetLastMessageId(cmd.getGetLastMessageId());
+                cmd.getGetLastMessageId().recycle();
+                break;
+
+            case GET_LAST_MESSAGE_ID_RESPONSE:
+                checkArgument(cmd.hasGetLastMessageIdResponse());
+                
handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse());
+                cmd.getGetLastMessageIdResponse().recycle();
+                break;
             }
         } finally {
             if (cmdBuilder != null) {
@@ -377,5 +390,12 @@ protected void 
handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn
         throw new UnsupportedOperationException();
     }
 
+    protected void handleGetLastMessageId(PulsarApi.CommandGetLastMessageId 
getLastMessageId) {
+        throw new UnsupportedOperationException();
+    }
+    protected void 
handleGetLastMessageIdSuccess(PulsarApi.CommandGetLastMessageIdResponse 
success) {
+        throw new UnsupportedOperationException();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PulsarDecoder.class);
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 2c131251d..27cc63ae6 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -199,6 +199,7 @@ private AuthMethod(int index, int value) {
     v9(9, 9),
     v10(10, 10),
     v11(11, 11),
+    v12(12, 12),
     ;
     
     public static final int v0_VALUE = 0;
@@ -213,6 +214,7 @@ private AuthMethod(int index, int value) {
     public static final int v9_VALUE = 9;
     public static final int v10_VALUE = 10;
     public static final int v11_VALUE = 11;
+    public static final int v12_VALUE = 12;
     
     
     public final int getNumber() { return value; }
@@ -231,6 +233,7 @@ public static ProtocolVersion valueOf(int value) {
         case 9: return v9;
         case 10: return v10;
         case 11: return v11;
+        case 12: return v12;
         default: return null;
       }
     }
@@ -20067,6 +20070,831 @@ public Builder clearMsgBacklog() {
     // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandConsumerStatsResponse)
   }
   
+  public interface CommandGetLastMessageIdOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required uint64 consumer_id = 1;
+    boolean hasConsumerId();
+    long getConsumerId();
+    
+    // required uint64 request_id = 2;
+    boolean hasRequestId();
+    long getRequestId();
+  }
+  public static final class CommandGetLastMessageId extends
+      com.google.protobuf.GeneratedMessageLite
+      implements CommandGetLastMessageIdOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use CommandGetLastMessageId.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<CommandGetLastMessageId> 
handle;
+    private 
CommandGetLastMessageId(io.netty.util.Recycler.Handle<CommandGetLastMessageId> 
handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandGetLastMessageId> 
RECYCLER = new io.netty.util.Recycler<CommandGetLastMessageId>() {
+            protected CommandGetLastMessageId 
newObject(Handle<CommandGetLastMessageId> handle) {
+              return new CommandGetLastMessageId(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private CommandGetLastMessageId(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final CommandGetLastMessageId defaultInstance;
+    public static CommandGetLastMessageId getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetLastMessageId getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 consumer_id = 1;
+    public static final int CONSUMER_ID_FIELD_NUMBER = 1;
+    private long consumerId_;
+    public boolean hasConsumerId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getConsumerId() {
+      return consumerId_;
+    }
+    
+    // required uint64 request_id = 2;
+    public static final int REQUEST_ID_FIELD_NUMBER = 2;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    private void initFields() {
+      consumerId_ = 0L;
+      requestId_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasConsumerId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, consumerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeUInt64(2, requestId_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(1, consumerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(2, requestId_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId, Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> 
handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        consumerId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
result = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.consumerId_ = consumerId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.requestId_ = requestId_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance())
 return this;
+        if (other.hasConsumerId()) {
+          setConsumerId(other.getConsumerId());
+        }
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasConsumerId()) {
+          
+          return false;
+        }
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              consumerId_ = input.readUInt64();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              requestId_ = input.readUInt64();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 consumer_id = 1;
+      private long consumerId_ ;
+      public boolean hasConsumerId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getConsumerId() {
+        return consumerId_;
+      }
+      public Builder setConsumerId(long value) {
+        bitField0_ |= 0x00000001;
+        consumerId_ = value;
+        
+        return this;
+      }
+      public Builder clearConsumerId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        consumerId_ = 0L;
+        
+        return this;
+      }
+      
+      // required uint64 request_id = 2;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000002;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // 
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetLastMessageId)
+    }
+    
+    static {
+      defaultInstance = new CommandGetLastMessageId(true);
+      defaultInstance.initFields();
+    }
+    
+    // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandGetLastMessageId)
+  }
+  
+  public interface CommandGetLastMessageIdResponseOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required .pulsar.proto.MessageIdData last_message_id = 1;
+    boolean hasLastMessageId();
+    org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
getLastMessageId();
+    
+    // required uint64 request_id = 2;
+    boolean hasRequestId();
+    long getRequestId();
+  }
+  public static final class CommandGetLastMessageIdResponse extends
+      com.google.protobuf.GeneratedMessageLite
+      implements CommandGetLastMessageIdResponseOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use CommandGetLastMessageIdResponse.newBuilder() to construct.
+    private final 
io.netty.util.Recycler.Handle<CommandGetLastMessageIdResponse> handle;
+    private 
CommandGetLastMessageIdResponse(io.netty.util.Recycler.Handle<CommandGetLastMessageIdResponse>
 handle) {
+      this.handle = handle;
+    }
+    
+     private static final 
io.netty.util.Recycler<CommandGetLastMessageIdResponse> RECYCLER = new 
io.netty.util.Recycler<CommandGetLastMessageIdResponse>() {
+            protected CommandGetLastMessageIdResponse 
newObject(Handle<CommandGetLastMessageIdResponse> handle) {
+              return new CommandGetLastMessageIdResponse(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private CommandGetLastMessageIdResponse(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final CommandGetLastMessageIdResponse defaultInstance;
+    public static CommandGetLastMessageIdResponse getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetLastMessageIdResponse getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required .pulsar.proto.MessageIdData last_message_id = 1;
+    public static final int LAST_MESSAGE_ID_FIELD_NUMBER = 1;
+    private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
lastMessageId_;
+    public boolean hasLastMessageId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
getLastMessageId() {
+      return lastMessageId_;
+    }
+    
+    // required uint64 request_id = 2;
+    public static final int REQUEST_ID_FIELD_NUMBER = 2;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    private void initFields() {
+      lastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+      requestId_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasLastMessageId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!getLastMessageId().isInitialized()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeMessage(1, lastMessageId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeUInt64(2, requestId_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(1, lastMessageId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(2, requestId_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse, 
Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponseOrBuilder,
 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> 
handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        lastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
build() {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
buildPartial() {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
result = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.lastMessageId_ = lastMessageId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.requestId_ = requestId_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse
 other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance())
 return this;
+        if (other.hasLastMessageId()) {
+          mergeLastMessageId(other.getLastMessageId());
+        }
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasLastMessageId()) {
+          
+          return false;
+        }
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        if (!getLastMessageId().isInitialized()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder();
+              if (hasLastMessageId()) {
+                subBuilder.mergeFrom(getLastMessageId());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setLastMessageId(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              requestId_ = input.readUInt64();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required .pulsar.proto.MessageIdData last_message_id = 1;
+      private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
lastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+      public boolean hasLastMessageId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
getLastMessageId() {
+        return lastMessageId_;
+      }
+      public Builder 
setLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        lastMessageId_ = value;
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder setLastMessageId(
+          org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder 
builderForValue) {
+        lastMessageId_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder 
mergeLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
value) {
+        if (((bitField0_ & 0x00000001) == 0x00000001) &&
+            lastMessageId_ != 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance())
 {
+          lastMessageId_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder(lastMessageId_).mergeFrom(value).buildPartial();
+        } else {
+          lastMessageId_ = value;
+        }
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder clearLastMessageId() {
+        lastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      // required uint64 request_id = 2;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000002;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // 
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetLastMessageIdResponse)
+    }
+    
+    static {
+      defaultInstance = new CommandGetLastMessageIdResponse(true);
+      defaultInstance.initFields();
+    }
+    
+    // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandGetLastMessageIdResponse)
+  }
+  
   public interface BaseCommandOrBuilder
       extends com.google.protobuf.MessageLiteOrBuilder {
     
@@ -20181,6 +21009,14 @@ public Builder clearMsgBacklog() {
     // optional .pulsar.proto.CommandSeek seek = 28;
     boolean hasSeek();
     org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek getSeek();
+    
+    // optional .pulsar.proto.CommandGetLastMessageId getLastMessageId = 29;
+    boolean hasGetLastMessageId();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getGetLastMessageId();
+    
+    // optional .pulsar.proto.CommandGetLastMessageIdResponse 
getLastMessageIdResponse = 30;
+    boolean hasGetLastMessageIdResponse();
+    
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getGetLastMessageIdResponse();
   }
   public static final class BaseCommand extends
       com.google.protobuf.GeneratedMessageLite
@@ -20247,6 +21083,8 @@ public BaseCommand getDefaultInstanceForType() {
       CONSUMER_STATS_RESPONSE(24, 26),
       REACHED_END_OF_TOPIC(25, 27),
       SEEK(26, 28),
+      GET_LAST_MESSAGE_ID(27, 29),
+      GET_LAST_MESSAGE_ID_RESPONSE(28, 30),
       ;
       
       public static final int CONNECT_VALUE = 2;
@@ -20276,6 +21114,8 @@ public BaseCommand getDefaultInstanceForType() {
       public static final int CONSUMER_STATS_RESPONSE_VALUE = 26;
       public static final int REACHED_END_OF_TOPIC_VALUE = 27;
       public static final int SEEK_VALUE = 28;
+      public static final int GET_LAST_MESSAGE_ID_VALUE = 29;
+      public static final int GET_LAST_MESSAGE_ID_RESPONSE_VALUE = 30;
       
       
       public final int getNumber() { return value; }
@@ -20309,6 +21149,8 @@ public static Type valueOf(int value) {
           case 26: return CONSUMER_STATS_RESPONSE;
           case 27: return REACHED_END_OF_TOPIC;
           case 28: return SEEK;
+          case 29: return GET_LAST_MESSAGE_ID;
+          case 30: return GET_LAST_MESSAGE_ID_RESPONSE;
           default: return null;
         }
       }
@@ -20615,6 +21457,26 @@ public boolean hasSeek() {
       return seek_;
     }
     
+    // optional .pulsar.proto.CommandGetLastMessageId getLastMessageId = 29;
+    public static final int GETLASTMESSAGEID_FIELD_NUMBER = 29;
+    private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getLastMessageId_;
+    public boolean hasGetLastMessageId() {
+      return ((bitField0_ & 0x10000000) == 0x10000000);
+    }
+    public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getGetLastMessageId() {
+      return getLastMessageId_;
+    }
+    
+    // optional .pulsar.proto.CommandGetLastMessageIdResponse 
getLastMessageIdResponse = 30;
+    public static final int GETLASTMESSAGEIDRESPONSE_FIELD_NUMBER = 30;
+    private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getLastMessageIdResponse_;
+    public boolean hasGetLastMessageIdResponse() {
+      return ((bitField0_ & 0x20000000) == 0x20000000);
+    }
+    public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getGetLastMessageIdResponse() {
+      return getLastMessageIdResponse_;
+    }
+    
     private void initFields() {
       type_ = 
org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT;
       connect_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance();
@@ -20644,6 +21506,8 @@ private void initFields() {
       consumerStatsResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse.getDefaultInstance();
       reachedEndOfTopic_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopic.getDefaultInstance();
       seek_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance();
+      getLastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
+      getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -20804,6 +21668,18 @@ public final boolean isInitialized() {
           return false;
         }
       }
+      if (hasGetLastMessageId()) {
+        if (!getGetLastMessageId().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      if (hasGetLastMessageIdResponse()) {
+        if (!getGetLastMessageIdResponse().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -20900,6 +21776,12 @@ public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x08000000) == 0x08000000)) {
         output.writeMessage(28, seek_);
       }
+      if (((bitField0_ & 0x10000000) == 0x10000000)) {
+        output.writeMessage(29, getLastMessageId_);
+      }
+      if (((bitField0_ & 0x20000000) == 0x20000000)) {
+        output.writeMessage(30, getLastMessageIdResponse_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -21020,6 +21902,14 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(28, seek_);
       }
+      if (((bitField0_ & 0x10000000) == 0x10000000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(29, getLastMessageId_);
+      }
+      if (((bitField0_ & 0x20000000) == 0x20000000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(30, getLastMessageIdResponse_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -21189,6 +22079,10 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x04000000);
         seek_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance();
         bitField0_ = (bitField0_ & ~0x08000000);
+        getLastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x10000000);
+        getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x20000000);
         return this;
       }
       
@@ -21334,6 +22228,14 @@ public Builder clone() {
           to_bitField0_ |= 0x08000000;
         }
         result.seek_ = seek_;
+        if (((from_bitField0_ & 0x10000000) == 0x10000000)) {
+          to_bitField0_ |= 0x10000000;
+        }
+        result.getLastMessageId_ = getLastMessageId_;
+        if (((from_bitField0_ & 0x20000000) == 0x20000000)) {
+          to_bitField0_ |= 0x20000000;
+        }
+        result.getLastMessageIdResponse_ = getLastMessageIdResponse_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -21424,6 +22326,12 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.BaseComman
         if (other.hasSeek()) {
           mergeSeek(other.getSeek());
         }
+        if (other.hasGetLastMessageId()) {
+          mergeGetLastMessageId(other.getGetLastMessageId());
+        }
+        if (other.hasGetLastMessageIdResponse()) {
+          mergeGetLastMessageIdResponse(other.getGetLastMessageIdResponse());
+        }
         return this;
       }
       
@@ -21582,6 +22490,18 @@ public final boolean isInitialized() {
             return false;
           }
         }
+        if (hasGetLastMessageId()) {
+          if (!getGetLastMessageId().isInitialized()) {
+            
+            return false;
+          }
+        }
+        if (hasGetLastMessageIdResponse()) {
+          if (!getGetLastMessageIdResponse().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -21886,6 +22806,26 @@ public Builder mergeFrom(
               subBuilder.recycle();
               break;
             }
+            case 234: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.Builder 
subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.newBuilder();
+              if (hasGetLastMessageId()) {
+                subBuilder.mergeFrom(getGetLastMessageId());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetLastMessageId(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 242: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.Builder
 subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.newBuilder();
+              if (hasGetLastMessageIdResponse()) {
+                subBuilder.mergeFrom(getGetLastMessageIdResponse());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetLastMessageIdResponse(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
@@ -23077,6 +24017,92 @@ public Builder clearSeek() {
         return this;
       }
       
+      // optional .pulsar.proto.CommandGetLastMessageId getLastMessageId = 29;
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getLastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
+      public boolean hasGetLastMessageId() {
+        return ((bitField0_ & 0x10000000) == 0x10000000);
+      }
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getGetLastMessageId() {
+        return getLastMessageId_;
+      }
+      public Builder 
setGetLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId
 value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getLastMessageId_ = value;
+        
+        bitField0_ |= 0x10000000;
+        return this;
+      }
+      public Builder setGetLastMessageId(
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.Builder 
builderForValue) {
+        getLastMessageId_ = builderForValue.build();
+        
+        bitField0_ |= 0x10000000;
+        return this;
+      }
+      public Builder 
mergeGetLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId
 value) {
+        if (((bitField0_ & 0x10000000) == 0x10000000) &&
+            getLastMessageId_ != 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance())
 {
+          getLastMessageId_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.newBuilder(getLastMessageId_).mergeFrom(value).buildPartial();
+        } else {
+          getLastMessageId_ = value;
+        }
+        
+        bitField0_ |= 0x10000000;
+        return this;
+      }
+      public Builder clearGetLastMessageId() {
+        getLastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x10000000);
+        return this;
+      }
+      
+      // optional .pulsar.proto.CommandGetLastMessageIdResponse 
getLastMessageIdResponse = 30;
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
+      public boolean hasGetLastMessageIdResponse() {
+        return ((bitField0_ & 0x20000000) == 0x20000000);
+      }
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getGetLastMessageIdResponse() {
+        return getLastMessageIdResponse_;
+      }
+      public Builder 
setGetLastMessageIdResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse
 value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getLastMessageIdResponse_ = value;
+        
+        bitField0_ |= 0x20000000;
+        return this;
+      }
+      public Builder setGetLastMessageIdResponse(
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.Builder
 builderForValue) {
+        getLastMessageIdResponse_ = builderForValue.build();
+        
+        bitField0_ |= 0x20000000;
+        return this;
+      }
+      public Builder 
mergeGetLastMessageIdResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse
 value) {
+        if (((bitField0_ & 0x20000000) == 0x20000000) &&
+            getLastMessageIdResponse_ != 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance())
 {
+          getLastMessageIdResponse_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.newBuilder(getLastMessageIdResponse_).mergeFrom(value).buildPartial();
+        } else {
+          getLastMessageIdResponse_ = value;
+        }
+        
+        bitField0_ |= 0x20000000;
+        return this;
+      }
+      public Builder clearGetLastMessageIdResponse() {
+        getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x20000000);
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 964fd526e..ca3d2fb3d 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -135,6 +135,7 @@ enum ProtocolVersion {
        v9 = 9;  // Added end of topic notification
        v10 = 10;// Added proxy to broker
        v11 = 11;// C++ consumers before this version are not correctly 
handling the checksum field
+       v12 = 12;//Added get topic's last messageId from broker
 }
 
 message CommandConnect {
@@ -152,13 +153,13 @@ message CommandConnect {
        // a Pulsar proxy. In this case the auth info above
        // will be the auth of the proxy itself
        optional string original_principal = 7;
-       
-       // Original auth role and auth Method that was passed 
+
+       // Original auth role and auth Method that was passed
        // to the proxy. In this case the auth info above
-       // will be the auth of the proxy itself 
+       // will be the auth of the proxy itself
        optional string original_auth_data = 8;
        optional string original_auth_method = 9;
-       
+
 }
 
 message CommandConnected {
@@ -202,9 +203,9 @@ message CommandPartitionedTopicMetadata {
        // Original principal that was verified by
        // a Pulsar proxy.
        optional string original_principal = 3;
-       
-       // Original auth role and auth Method that was passed 
-       // to the proxy.        
+
+       // Original auth role and auth Method that was passed
+       // to the proxy.
        optional string original_auth_data = 4;
        optional string original_auth_method = 5;
 }
@@ -225,13 +226,13 @@ message CommandLookupTopic {
        required string topic            = 1;
        required uint64 request_id       = 2;
        optional bool authoritative      = 3 [default = false];
-       
+
        // Original principal that was verified by
        // a Pulsar proxy.
        optional string original_principal = 4;
-       
-       // Original auth role and auth Method that was passed 
-       // to the proxy.        
+
+       // Original auth role and auth Method that was passed
+       // to the proxy.
        optional string original_auth_data = 5;
        optional string original_auth_method = 6;
 }
@@ -340,7 +341,7 @@ message CommandUnsubscribe {
 message CommandSeek {
        required uint64 consumer_id = 1;
        required uint64 request_id  = 2;
-       
+
        optional MessageIdData message_id = 3;
 }
 
@@ -374,7 +375,7 @@ message CommandSuccess {
 message CommandProducerSuccess {
        required uint64 request_id    = 1;
        required string producer_name = 2;
-       
+
        // The last sequence id that was stored by this producer in the 
previous session
        // This will only be meaningful if deduplication has been enabled.
        optional int64  last_sequence_id = 3 [default = -1];
@@ -443,6 +444,16 @@ message CommandConsumerStatsResponse {
         optional uint64 msgBacklog                  = 15;
 }
 
+message CommandGetLastMessageId {
+       required uint64 consumer_id = 1;
+       required uint64 request_id  = 2;
+}
+
+message CommandGetLastMessageIdResponse {
+       required MessageIdData last_message_id = 1;
+       required uint64 request_id  = 2;
+}
+
 message BaseCommand {
        enum Type {
                CONNECT     = 2;
@@ -486,6 +497,9 @@ message BaseCommand {
                REACHED_END_OF_TOPIC = 27;
 
                SEEK = 28;
+
+               GET_LAST_MESSAGE_ID = 29;
+               GET_LAST_MESSAGE_ID_RESPONSE = 30;
        }
 
        required Type type = 1;
@@ -524,6 +538,11 @@ message BaseCommand {
        optional CommandConsumerStatsResponse consumerStatsResponse         = 
26;
 
        optional CommandReachedEndOfTopic reachedEndOfTopic  = 27;
-       
+
        optional CommandSeek seek = 28;
+
+       optional CommandGetLastMessageId getLastMessageId = 29;
+       optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30;
+
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to