This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7404952  Issue 937: add CommandGetLastMessageId to make reader know 
the end of topic (#1066)
7404952 is described below

commit 74049522a1d97e1171c9088acd638b426b6de015
Author: Jia Zhai <zhaiji...@gmail.com>
AuthorDate: Tue Feb 13 21:21:29 2018 -0800

    Issue 937: add CommandGetLastMessageId to make reader know the end of topic 
(#1066)
    
    * add CommandGetLastMessageId to getlastMessageId of topic
    
    * rebase master, change following comments
    
    * add partition index in GetLastMessageIdResponse
    
    * fix rebase error
    
    * bump proot version to v11
    
    * change following comments
    
    * change following comments2
    
    * change following comments3
    
    * change following comments
    
    * get cnx() first
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |    7 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |    9 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   52 +-
 .../org/apache/pulsar/broker/service/Topic.java    |    3 +
 .../service/nonpersistent/NonPersistentTopic.java  |    6 +
 .../broker/service/persistent/PersistentTopic.java |    5 +
 .../apache/pulsar/client/api/TopicReaderTest.java  |  107 +-
 .../java/org/apache/pulsar/client/api/Reader.java  |   10 +
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   37 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  131 ++-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |   11 +-
 .../pulsar/client/util/ExecutorProvider.java       |    2 +-
 .../org/apache/pulsar/common/api/Commands.java     |   41 +-
 .../apache/pulsar/common/api/PulsarDecoder.java    |   20 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 1026 ++++++++++++++++++++
 pulsar-common/src/main/proto/PulsarApi.proto       |   47 +-
 16 files changed, 1457 insertions(+), 57 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index e13664c..9149bb9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -334,4 +334,11 @@ public interface ManagedLedger {
      * @param config
      */
     void setConfig(ManagedLedgerConfig config);
+
+    /**
+     * Gets last confirmed entry of the managed ledger.
+     *
+     * @return the last confirmed entry id
+     */
+    Position getLastConfirmedEntry();
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 80a0bbe..89d3476 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1062,7 +1062,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         Futures.waitForAll(futures).thenRun(() -> {
             callback.closeComplete(ctx);
         }).exceptionally(exception -> {
-            
callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx);
+            
callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()),
 ctx);
             return null;
         });
     }
@@ -1282,7 +1282,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             }).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position 
{} - {}", name, opReadEntry.readPosition,
                         ex.getMessage());
-                
opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), 
opReadEntry.ctx);
+                
opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
 opReadEntry.ctx);
                 return null;
             });
         }
@@ -1351,7 +1351,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 entryCache.asyncReadEntry(ledger, position, callback, ctx);
             }).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position 
{} - {}", name, position, ex.getMessage());
-                
callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx);
+                
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
 ctx);
                 return null;
             });
         }
@@ -2173,7 +2173,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         return pendingAddEntries.size();
     }
 
-    public PositionImpl getLastConfirmedEntry() {
+    @Override
+    public Position getLastConfirmedEntry() {
         return lastConfirmedEntry;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0b8d512..cb473c8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -31,10 +31,8 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
-
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
@@ -59,6 +57,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
@@ -110,7 +109,7 @@ public class ServerCnx extends PulsarHandler {
     private String originalPrincipal = null;
     private Set<String> proxyRoles;
     private boolean authenticateOriginalAuthData;
-    
+
     enum State {
         Start, Connected, Failed
     }
@@ -192,8 +191,8 @@ public class ServerCnx extends PulsarHandler {
     }
 
     /*
-     * If authentication and authorization is enabled and if the authRole is 
one of proxyRoles we want to enforce 
-     * - the originalPrincipal is given while connecting 
+     * If authentication and authorization is enabled and if the authRole is 
one of proxyRoles we want to enforce
+     * - the originalPrincipal is given while connecting
      * - originalPrincipal is not blank
      * - originalPrincipal is not a proxy principal
      */
@@ -218,7 +217,7 @@ public class ServerCnx extends PulsarHandler {
         if (topicName == null) {
             return;
         }
-        
+
         String originalPrincipal = null;
         if (authenticateOriginalAuthData && lookup.hasOriginalAuthData()) {
             originalPrincipal = validateOriginalPrincipal(
@@ -233,9 +232,9 @@ public class ServerCnx extends PulsarHandler {
         } else {
             originalPrincipal = lookup.hasOriginalPrincipal() ? 
lookup.getOriginalPrincipal() : this.originalPrincipal;
         }
-        
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
-        if (lookupSemaphore.tryAcquire()) {            
+        if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {
                 final String msg = "Valid Proxy Client role should be provided 
for lookup ";
                 log.warn("[{}] {} with role {} and proxyClientAuthRole {} on 
topic {}", remoteAddress, msg, authRole,
@@ -319,7 +318,7 @@ public class ServerCnx extends PulsarHandler {
         } else {
             originalPrincipal = partitionMetadata.hasOriginalPrincipal() ? 
partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
         }
-        
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {
@@ -441,7 +440,7 @@ public class ServerCnx extends PulsarHandler {
 
         return commandConsumerStatsResponseBuilder;
     }
-    
+
     private String validateOriginalPrincipal(String originalAuthData, String 
originalAuthMethod, String originalPrincipal, Long requestId, 
GeneratedMessageLite request) {
         ChannelHandler sslHandler = 
ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
         SSLSession sslSession = null;
@@ -461,7 +460,7 @@ public class ServerCnx extends PulsarHandler {
             return null;
         }
     }
-    
+
     private String getOriginalPrincipal(String originalAuthData, String 
originalAuthMethod, String originalPrincipal,
             SSLSession sslSession) throws AuthenticationException {
         if (authenticateOriginalAuthData) {
@@ -532,7 +531,7 @@ public class ServerCnx extends PulsarHandler {
         DestinationName topicName = validateTopicName(subscribe.getTopic(), 
requestId, subscribe);
         if (topicName == null) {
             return;
-        }  
+        }
 
         if (invalidOriginalPrincipal(originalPrincipal)) {
             final String msg = "Valid Proxy Client role should be provided 
while subscribing ";
@@ -1105,6 +1104,35 @@ public class ServerCnx extends PulsarHandler {
     }
 
     @Override
+    protected void handleGetLastMessageId(CommandGetLastMessageId 
getLastMessageId) {
+        checkArgument(state == State.Connected);
+
+        CompletableFuture<Consumer> consumerFuture = 
consumers.get(getLastMessageId.getConsumerId());
+
+        if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
+            Consumer consumer = consumerFuture.getNow(null);
+            long requestId = getLastMessageId.getRequestId();
+
+            Topic topic = consumer.getSubscription().getTopic();
+            Position position = topic.getLastMessageId();
+            int partitionIndex = 
DestinationName.getPartitionIndex(topic.getName());
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex 
{}", remoteAddress,
+                    topic.getName(), consumer.getSubscription().getName(), 
position, partitionIndex);
+            }
+            MessageIdData messageId = MessageIdData.newBuilder()
+                .setLedgerId(((PositionImpl)position).getLedgerId())
+                .setEntryId(((PositionImpl)position).getEntryId())
+                .setPartition(partitionIndex)
+                .build();
+
+            ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
messageId));
+        } else {
+            
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), 
ServerError.MetadataError, "Consumer not found"));
+        }
+    }
+
+    @Override
     protected boolean isHandshakeCompleted() {
         return state == State.Connected;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 7a426b7..80aed77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
@@ -122,4 +123,6 @@ public interface Topic {
     PersistentTopicStats getStats();
 
     PersistentTopicInternalStats getInternalStats();
+
+    Position getLastMessageId();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 44a9e14..588c494 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -909,6 +910,11 @@ public class NonPersistentTopic implements Topic {
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public Position getLastMessageId() {
+        throw new UnsupportedOperationException("getLastMessageId is not 
supported on non-persistent topic");
+    }
+
     public void markBatchMessagePublished() {
         this.hasBatchMessagePublished = true;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 20ae527..a17e2de 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1536,5 +1536,10 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
         return messageDeduplication.getLastPublishedSequenceId(producerName);
     }
 
+    @Override
+    public Position getLastMessageId() {
+        return ledger.getLastConfirmedEntry();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopic.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index df36b8e..e8f635f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -19,7 +19,11 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -28,8 +32,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.policies.data.PersistentTopicStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,8 +42,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Sets;
-
 public class TopicReaderTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(TopicReaderTest.class);
 
@@ -359,4 +361,103 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
         reader.close();
         log.info("-- Exiting {} test --", methodName);
     }
+
+
+    @Test
+    public void testSimpleReaderReachEndofTopic() throws Exception {
+        ReaderConfiguration conf = new ReaderConfiguration();
+        Reader reader = 
pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", 
MessageId.earliest,
+            conf);
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", 
producerConf);
+
+        // no data write, should return false
+        assertFalse(reader.hasMessageAvailable());
+
+        // produce message 0 -- 99
+        for (int i = 0; i < 100; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        MessageImpl msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        int index = 0;
+
+        // read message till end.
+        while (reader.hasMessageAvailable()) {
+            msg = (MessageImpl) reader.readNext(1, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + (index ++);
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+
+        assertEquals(index, 100);
+        // readNext should return null, after reach the end of topic.
+        assertNull(reader.readNext(1, TimeUnit.SECONDS));
+
+        // produce message again.
+        for (int i = 100; i < 200; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // read message till end again.
+        while (reader.hasMessageAvailable()) {
+            msg = (MessageImpl) reader.readNext(1, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + (index ++);
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+
+        assertEquals(index, 200);
+        // readNext should return null, after reach the end of topic.
+        assertNull(reader.readNext(1, TimeUnit.SECONDS));
+
+        producer.close();
+    }
+
+    @Test
+    public void testReaderReachEndofTopicOnMessageWithBatches() throws 
Exception {
+        Reader reader = pulsarClient.createReader(
+            
"persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches",
 MessageId.earliest,
+            new ReaderConfiguration());
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(true);
+        producerConf.setBatchingMaxPublishDelay(100, TimeUnit.MILLISECONDS);
+        Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches",
 producerConf);
+
+        // no data write, should return false
+        assertFalse(reader.hasMessageAvailable());
+
+        for (int i = 0; i < 100; i++) {
+            String message = "my-message-" + i;
+            producer.sendAsync(message.getBytes());
+        }
+
+        // Write one sync message to ensure everything before got persistend
+        producer.send("my-message-10".getBytes());
+
+        MessageId lastMessageId = null;
+        int index = 0;
+        assertTrue(reader.hasMessageAvailable());
+
+        if (reader.hasMessageAvailable()) {
+            Message msg = reader.readNext();
+            lastMessageId = msg.getMessageId();
+            assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class);
+
+            while (msg != null) {
+                index++;
+                msg = reader.readNext(100, TimeUnit.MILLISECONDS);
+            }
+            assertEquals(index, 101);
+        }
+
+        assertFalse(reader.hasMessageAvailable());
+        producer.close();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java
index 4b89470..d29b238 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -62,4 +62,14 @@ public interface Reader extends Closeable {
      * Return true if the topic was terminated and this reader has reached the 
end of the topic
      */
     boolean hasReachedEndOfTopic();
+
+    /**
+     * Check if there is any message available to read from the current 
position.
+     */
+    boolean hasMessageAvailable() throws PulsarClientException;
+
+    /**
+     * Asynchronously Check if there is message that has been published 
successfully to the broker in the topic.
+     */
+    CompletableFuture<Boolean> hasMessageAvailableAsync();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 1064654..58cdede 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -44,6 +44,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse;
@@ -52,6 +53,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
@@ -76,6 +78,8 @@ public class ClientCnx extends PulsarHandler {
             16, 1);
     private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> 
pendingLookupRequests = new ConcurrentLongHashMap<>(
             16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> 
pendingGetLastMessageIdRequests = new ConcurrentLongHashMap<>(
+        16, 1);
     private final ConcurrentLongHashMap<ProducerImpl> producers = new 
ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<ConsumerImpl> consumers = new 
ConcurrentLongHashMap<>(16, 1);
 
@@ -158,6 +162,7 @@ public class ClientCnx extends PulsarHandler {
         // Fail out all the pending ops
         pendingRequests.forEach((key, future) -> 
future.completeExceptionally(e));
         pendingLookupRequests.forEach((key, future) -> 
future.completeExceptionally(e));
+        pendingGetLastMessageIdRequests.forEach((key, future) -> 
future.completeExceptionally(e));
 
         // Notify all attached producers/consumers so they have a chance to 
reconnect
         producers.forEach((id, producer) -> producer.connectionClosed(this));
@@ -264,6 +269,22 @@ public class ClientCnx extends PulsarHandler {
     }
 
     @Override
+    protected void 
handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse success) {
+        checkArgument(state == State.Ready);
+
+        if (log.isDebugEnabled()) {
+            log.debug("{} Received success GetLastMessageId response from 
server: {}", ctx.channel(), success.getRequestId());
+        }
+        long requestId = success.getRequestId();
+        CompletableFuture<MessageIdData> requestFuture = 
pendingGetLastMessageIdRequests.remove(requestId);
+        if (requestFuture != null) {
+            requestFuture.complete(success.getLastMessageId());
+        } else {
+            log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), success.getRequestId());
+        }
+    }
+
+    @Override
     protected void handleProducerSuccess(CommandProducerSuccess success) {
         checkArgument(state == State.Ready);
 
@@ -511,6 +532,22 @@ public class ClientCnx extends PulsarHandler {
         return future;
     }
 
+    public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf 
request, long requestId) {
+        CompletableFuture<MessageIdData> future = new CompletableFuture<>();
+
+        pendingGetLastMessageIdRequests.put(requestId, future);
+
+        ctx.writeAndFlush(request).addListener(writeFuture -> {
+            if (!writeFuture.isSuccess()) {
+                log.warn("{} Failed to send GetLastMessageId request to 
broker: {}", ctx.channel(), writeFuture.cause().getMessage());
+                pendingGetLastMessageIdRequests.remove(requestId);
+                future.completeExceptionally(writeFuture.cause());
+            }
+        });
+
+        return future;
+    }
+
     /**
      * check serverError and take appropriate action
      * <ul>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b74bb13..a436899 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,19 +25,33 @@ import static 
org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
+import com.google.common.collect.Iterables;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Timeout;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@@ -60,13 +74,6 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Iterables;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Timeout;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
 public class ConsumerImpl extends ConsumerBase {
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
 
@@ -78,7 +85,8 @@ public class ConsumerImpl extends ConsumerBase {
             .newUpdater(ConsumerImpl.class, "availablePermits");
     private volatile int availablePermits = 0;
 
-    private MessageIdImpl lastDequeuedMessage;
+    private MessageId lastDequeuedMessage = MessageId.earliest;
+    private MessageId lastMessageIdInBroker = MessageId.earliest;
 
     private long subscribeTimeout;
     private final int partitionIndex;
@@ -278,7 +286,7 @@ public class ConsumerImpl extends ConsumerBase {
             }
             do {
                 message = incomingMessages.take();
-                lastDequeuedMessage = (MessageIdImpl) message.getMessageId();
+                lastDequeuedMessage = message.getMessageId();
                 ClientCnx msgCnx = ((MessageImpl) message).getCnx();
                 // synchronized need to prevent race between connectionOpened 
and the check "msgCnx == cnx()"
                 synchronized (ConsumerImpl.this) {
@@ -639,10 +647,10 @@ public class ConsumerImpl extends ConsumerBase {
             }
 
             return previousMessage;
-        } else if (lastDequeuedMessage != null) {
+        } else if (!lastDequeuedMessage.equals(MessageId.earliest)) {
             // If the queue was empty we need to restart from the message just 
after the last one that has been dequeued
             // in the past
-            return new BatchMessageIdImpl(lastDequeuedMessage);
+            return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessage);
         } else {
             // No message was received or dequeued by this consumer. Next 
message would still be the startMessageId
             return startMessageId;
@@ -964,7 +972,7 @@ public class ConsumerImpl extends ConsumerBase {
     protected synchronized void messageProcessed(Message msg) {
         ClientCnx currentCnx = cnx();
         ClientCnx msgCnx = ((MessageImpl) msg).getCnx();
-        lastDequeuedMessage = (MessageIdImpl) msg.getMessageId();
+        lastDequeuedMessage = msg.getMessageId();
 
         if (msgCnx != currentCnx) {
             // The processed message did belong to the old queue that was 
cleared after reconnection.
@@ -1248,6 +1256,101 @@ public class ConsumerImpl extends ConsumerBase {
         return seekFuture;
     }
 
+    public boolean hasMessageAvailable() throws PulsarClientException {
+        try {
+            if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+                ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+                return true;
+            }
+
+            return hasMessageAvailableAsync().get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+        final CompletableFuture<Boolean> booleanFuture = new 
CompletableFuture<>();
+
+        if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+            ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+            booleanFuture.complete(true);
+        } else {
+            getLastMessageIdAsync().thenAccept(messageId -> {
+                lastMessageIdInBroker = messageId;
+                if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+                    ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+                    booleanFuture.complete(true);
+                } else {
+                    booleanFuture.complete(false);
+                }
+            }).exceptionally(e -> {
+                log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+                booleanFuture.completeExceptionally(e.getCause());
+                return null;
+            });
+        }
+        return booleanFuture;
+    }
+
+    private CompletableFuture<MessageId> getLastMessageIdAsync() {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil
+                .failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+        }
+
+        AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+        Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
+            opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
+            0 , TimeUnit.MILLISECONDS);
+        CompletableFuture<MessageId> getLastMessageIdFuture = new 
CompletableFuture<>();
+
+        internalGetLastMessageIdAsync(backoff, opTimeoutMs, 
getLastMessageIdFuture);
+        return getLastMessageIdFuture;
+    }
+
+    private void internalGetLastMessageIdAsync(final Backoff backoff,
+                                               final AtomicLong remainingTime,
+                                               CompletableFuture<MessageId> 
future) {
+        ClientCnx cnx = cnx();
+        if (isConnected() && cnx != null) {
+            if 
(!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion()))
 {
+                future.completeExceptionally(new PulsarClientException
+                    .NotSupportedException("GetLastMessageId Not supported for 
ProtocolVersion: " +
+                    cnx.getRemoteEndpointProtocolVersion()));
+            }
+
+            long requestId = client.newRequestId();
+            ByteBuf getLastIdCmd = Commands.newGetLastMessageId(consumerId, 
requestId);
+            log.info("[{}][{}] Get topic last message Id", topic, 
subscription);
+
+            cnx.sendGetLastMessageId(getLastIdCmd, 
requestId).thenAccept((result) -> {
+                log.info("[{}][{}] Successfully getLastMessageId {}:{}",
+                    topic, subscription, result.getLedgerId(), 
result.getEntryId());
+                future.complete(new MessageIdImpl(result.getLedgerId(),
+                    result.getEntryId(), result.getPartition()));
+            }).exceptionally(e -> {
+                log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+                future.completeExceptionally(e.getCause());
+                return null;
+            });
+        } else {
+            long nextDelay = Math.min(backoff.next(), remainingTime.get());
+            if (nextDelay <= 0) {
+                future.completeExceptionally(new PulsarClientException
+                    .TimeoutException("Could not getLastMessageId within 
configured timeout."));
+                return;
+            }
+
+            ((ScheduledExecutorService) listenerExecutor).schedule(() -> {
+                log.warn("[{}] [{}] Could not get connection while 
getLastMessageId -- Will try again in {} ms",
+                    topic, getHandlerName(), nextDelay);
+                remainingTime.addAndGet(-nextDelay);
+                internalGetLastMessageIdAsync(backoff, remainingTime, future);
+            }, nextDelay, TimeUnit.MILLISECONDS);
+        }
+    }
+
     private MessageIdImpl getMessageIdImpl(Message msg) {
         MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId();
         if (messageId instanceof BatchMessageIdImpl) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 5b50ef5..bf0dc2f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -23,7 +23,6 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
@@ -133,4 +132,14 @@ public class ReaderImpl implements Reader {
         return consumer.closeAsync();
     }
 
+    @Override
+    public boolean hasMessageAvailable() throws PulsarClientException {
+        return consumer.hasMessageAvailable();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+        return consumer.hasMessageAvailableAsync();
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index cb60720..907c52f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -41,7 +41,7 @@ public class ExecutorProvider {
         checkNotNull(threadNamePrefix);
         executors = Lists.newArrayListWithCapacity(numThreads);
         for (int i = 0; i < numThreads; i++) {
-            executors.add(Executors.newSingleThreadExecutor(new 
DefaultThreadFactory(threadNamePrefix)));
+            executors.add(Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory(threadNamePrefix)));
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 4376b01..2912a73 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -21,11 +21,14 @@ package org.apache.pulsar.common.api;
 import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
 import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
 
+import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
@@ -40,6 +43,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
@@ -67,12 +71,6 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 
-import com.google.protobuf.ByteString;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
 public class Commands {
 
     public static final short magicCrc32c = 0x0e01;
@@ -123,7 +121,7 @@ public class Commands {
         if (originalAuthData != null) {
             connectBuilder.setOriginalAuthData(originalAuthData);
         }
-        
+
         if (originalAuthMethod != null) {
             connectBuilder.setOriginalAuthMethod(originalAuthMethod);
         }
@@ -637,6 +635,29 @@ public class Commands {
         return cmdPong.retainedDuplicate();
     }
 
+    public static ByteBuf newGetLastMessageId(long consumerId, long requestId) 
{
+        CommandGetLastMessageId.Builder cmdBuilder = 
CommandGetLastMessageId.newBuilder();
+        cmdBuilder.setConsumerId(consumerId).setRequestId(requestId);
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_LAST_MESSAGE_ID)
+            .setGetLastMessageId(cmdBuilder.build()));
+        cmdBuilder.recycle();
+        return res;
+    }
+
+    public static ByteBuf newGetLastMessageIdResponse(long requestId, 
MessageIdData messageIdData) {
+        PulsarApi.CommandGetLastMessageIdResponse.Builder response = 
PulsarApi.CommandGetLastMessageIdResponse.newBuilder()
+            .setLastMessageId(messageIdData)
+            .setRequestId(requestId);
+
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_LAST_MESSAGE_ID_RESPONSE)
+            .setGetLastMessageIdResponse(response.build()));
+        response.recycle();
+        return res;
+    }
+
     private static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) {
         // / Wire format
         // [TOTAL_SIZE] [CMD_SIZE][CMD]
@@ -931,4 +952,8 @@ public class Commands {
         lookupBroker.recycle();
         return res;
     }
+
+    public static boolean peerSupportsGetLastMessageId(int peerVersion) {
+        return peerVersion >= ProtocolVersion.v12.getNumber();
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index e9e113a..8d2389b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.api;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
@@ -253,6 +254,18 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                 handleReachedEndOfTopic(cmd.getReachedEndOfTopic());
                 cmd.getReachedEndOfTopic().recycle();
                 break;
+
+            case GET_LAST_MESSAGE_ID:
+                checkArgument(cmd.hasGetLastMessageId());
+                handleGetLastMessageId(cmd.getGetLastMessageId());
+                cmd.getGetLastMessageId().recycle();
+                break;
+
+            case GET_LAST_MESSAGE_ID_RESPONSE:
+                checkArgument(cmd.hasGetLastMessageIdResponse());
+                
handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse());
+                cmd.getGetLastMessageIdResponse().recycle();
+                break;
             }
         } finally {
             if (cmdBuilder != null) {
@@ -377,5 +390,12 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleGetLastMessageId(PulsarApi.CommandGetLastMessageId 
getLastMessageId) {
+        throw new UnsupportedOperationException();
+    }
+    protected void 
handleGetLastMessageIdSuccess(PulsarApi.CommandGetLastMessageIdResponse 
success) {
+        throw new UnsupportedOperationException();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PulsarDecoder.class);
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 2c13125..27cc63a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -199,6 +199,7 @@ public final class PulsarApi {
     v9(9, 9),
     v10(10, 10),
     v11(11, 11),
+    v12(12, 12),
     ;
     
     public static final int v0_VALUE = 0;
@@ -213,6 +214,7 @@ public final class PulsarApi {
     public static final int v9_VALUE = 9;
     public static final int v10_VALUE = 10;
     public static final int v11_VALUE = 11;
+    public static final int v12_VALUE = 12;
     
     
     public final int getNumber() { return value; }
@@ -231,6 +233,7 @@ public final class PulsarApi {
         case 9: return v9;
         case 10: return v10;
         case 11: return v11;
+        case 12: return v12;
         default: return null;
       }
     }
@@ -20067,6 +20070,831 @@ public final class PulsarApi {
     // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandConsumerStatsResponse)
   }
   
+  public interface CommandGetLastMessageIdOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required uint64 consumer_id = 1;
+    boolean hasConsumerId();
+    long getConsumerId();
+    
+    // required uint64 request_id = 2;
+    boolean hasRequestId();
+    long getRequestId();
+  }
+  public static final class CommandGetLastMessageId extends
+      com.google.protobuf.GeneratedMessageLite
+      implements CommandGetLastMessageIdOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use CommandGetLastMessageId.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<CommandGetLastMessageId> 
handle;
+    private 
CommandGetLastMessageId(io.netty.util.Recycler.Handle<CommandGetLastMessageId> 
handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandGetLastMessageId> 
RECYCLER = new io.netty.util.Recycler<CommandGetLastMessageId>() {
+            protected CommandGetLastMessageId 
newObject(Handle<CommandGetLastMessageId> handle) {
+              return new CommandGetLastMessageId(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private CommandGetLastMessageId(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final CommandGetLastMessageId defaultInstance;
+    public static CommandGetLastMessageId getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetLastMessageId getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 consumer_id = 1;
+    public static final int CONSUMER_ID_FIELD_NUMBER = 1;
+    private long consumerId_;
+    public boolean hasConsumerId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getConsumerId() {
+      return consumerId_;
+    }
+    
+    // required uint64 request_id = 2;
+    public static final int REQUEST_ID_FIELD_NUMBER = 2;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    private void initFields() {
+      consumerId_ = 0L;
+      requestId_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasConsumerId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, consumerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeUInt64(2, requestId_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(1, consumerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(2, requestId_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId, Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> 
handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        consumerId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
result = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.consumerId_ = consumerId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.requestId_ = requestId_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance())
 return this;
+        if (other.hasConsumerId()) {
+          setConsumerId(other.getConsumerId());
+        }
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasConsumerId()) {
+          
+          return false;
+        }
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              consumerId_ = input.readUInt64();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              requestId_ = input.readUInt64();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 consumer_id = 1;
+      private long consumerId_ ;
+      public boolean hasConsumerId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getConsumerId() {
+        return consumerId_;
+      }
+      public Builder setConsumerId(long value) {
+        bitField0_ |= 0x00000001;
+        consumerId_ = value;
+        
+        return this;
+      }
+      public Builder clearConsumerId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        consumerId_ = 0L;
+        
+        return this;
+      }
+      
+      // required uint64 request_id = 2;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000002;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // 
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetLastMessageId)
+    }
+    
+    static {
+      defaultInstance = new CommandGetLastMessageId(true);
+      defaultInstance.initFields();
+    }
+    
+    // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandGetLastMessageId)
+  }
+  
+  public interface CommandGetLastMessageIdResponseOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required .pulsar.proto.MessageIdData last_message_id = 1;
+    boolean hasLastMessageId();
+    org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
getLastMessageId();
+    
+    // required uint64 request_id = 2;
+    boolean hasRequestId();
+    long getRequestId();
+  }
+  public static final class CommandGetLastMessageIdResponse extends
+      com.google.protobuf.GeneratedMessageLite
+      implements CommandGetLastMessageIdResponseOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use CommandGetLastMessageIdResponse.newBuilder() to construct.
+    private final 
io.netty.util.Recycler.Handle<CommandGetLastMessageIdResponse> handle;
+    private 
CommandGetLastMessageIdResponse(io.netty.util.Recycler.Handle<CommandGetLastMessageIdResponse>
 handle) {
+      this.handle = handle;
+    }
+    
+     private static final 
io.netty.util.Recycler<CommandGetLastMessageIdResponse> RECYCLER = new 
io.netty.util.Recycler<CommandGetLastMessageIdResponse>() {
+            protected CommandGetLastMessageIdResponse 
newObject(Handle<CommandGetLastMessageIdResponse> handle) {
+              return new CommandGetLastMessageIdResponse(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private CommandGetLastMessageIdResponse(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final CommandGetLastMessageIdResponse defaultInstance;
+    public static CommandGetLastMessageIdResponse getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetLastMessageIdResponse getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required .pulsar.proto.MessageIdData last_message_id = 1;
+    public static final int LAST_MESSAGE_ID_FIELD_NUMBER = 1;
+    private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
lastMessageId_;
+    public boolean hasLastMessageId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
getLastMessageId() {
+      return lastMessageId_;
+    }
+    
+    // required uint64 request_id = 2;
+    public static final int REQUEST_ID_FIELD_NUMBER = 2;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    private void initFields() {
+      lastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+      requestId_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasLastMessageId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!getLastMessageId().isInitialized()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeMessage(1, lastMessageId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeUInt64(2, requestId_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(1, lastMessageId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(2, requestId_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse, 
Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponseOrBuilder,
 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> 
handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        lastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
build() {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
buildPartial() {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
result = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.lastMessageId_ = lastMessageId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.requestId_ = requestId_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse
 other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance())
 return this;
+        if (other.hasLastMessageId()) {
+          mergeLastMessageId(other.getLastMessageId());
+        }
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasLastMessageId()) {
+          
+          return false;
+        }
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        if (!getLastMessageId().isInitialized()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder();
+              if (hasLastMessageId()) {
+                subBuilder.mergeFrom(getLastMessageId());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setLastMessageId(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              requestId_ = input.readUInt64();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required .pulsar.proto.MessageIdData last_message_id = 1;
+      private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
lastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+      public boolean hasLastMessageId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
getLastMessageId() {
+        return lastMessageId_;
+      }
+      public Builder 
setLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        lastMessageId_ = value;
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder setLastMessageId(
+          org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder 
builderForValue) {
+        lastMessageId_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder 
mergeLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
value) {
+        if (((bitField0_ & 0x00000001) == 0x00000001) &&
+            lastMessageId_ != 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance())
 {
+          lastMessageId_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder(lastMessageId_).mergeFrom(value).buildPartial();
+        } else {
+          lastMessageId_ = value;
+        }
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder clearLastMessageId() {
+        lastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      // required uint64 request_id = 2;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000002;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // 
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetLastMessageIdResponse)
+    }
+    
+    static {
+      defaultInstance = new CommandGetLastMessageIdResponse(true);
+      defaultInstance.initFields();
+    }
+    
+    // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandGetLastMessageIdResponse)
+  }
+  
   public interface BaseCommandOrBuilder
       extends com.google.protobuf.MessageLiteOrBuilder {
     
@@ -20181,6 +21009,14 @@ public final class PulsarApi {
     // optional .pulsar.proto.CommandSeek seek = 28;
     boolean hasSeek();
     org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek getSeek();
+    
+    // optional .pulsar.proto.CommandGetLastMessageId getLastMessageId = 29;
+    boolean hasGetLastMessageId();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getGetLastMessageId();
+    
+    // optional .pulsar.proto.CommandGetLastMessageIdResponse 
getLastMessageIdResponse = 30;
+    boolean hasGetLastMessageIdResponse();
+    
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getGetLastMessageIdResponse();
   }
   public static final class BaseCommand extends
       com.google.protobuf.GeneratedMessageLite
@@ -20247,6 +21083,8 @@ public final class PulsarApi {
       CONSUMER_STATS_RESPONSE(24, 26),
       REACHED_END_OF_TOPIC(25, 27),
       SEEK(26, 28),
+      GET_LAST_MESSAGE_ID(27, 29),
+      GET_LAST_MESSAGE_ID_RESPONSE(28, 30),
       ;
       
       public static final int CONNECT_VALUE = 2;
@@ -20276,6 +21114,8 @@ public final class PulsarApi {
       public static final int CONSUMER_STATS_RESPONSE_VALUE = 26;
       public static final int REACHED_END_OF_TOPIC_VALUE = 27;
       public static final int SEEK_VALUE = 28;
+      public static final int GET_LAST_MESSAGE_ID_VALUE = 29;
+      public static final int GET_LAST_MESSAGE_ID_RESPONSE_VALUE = 30;
       
       
       public final int getNumber() { return value; }
@@ -20309,6 +21149,8 @@ public final class PulsarApi {
           case 26: return CONSUMER_STATS_RESPONSE;
           case 27: return REACHED_END_OF_TOPIC;
           case 28: return SEEK;
+          case 29: return GET_LAST_MESSAGE_ID;
+          case 30: return GET_LAST_MESSAGE_ID_RESPONSE;
           default: return null;
         }
       }
@@ -20615,6 +21457,26 @@ public final class PulsarApi {
       return seek_;
     }
     
+    // optional .pulsar.proto.CommandGetLastMessageId getLastMessageId = 29;
+    public static final int GETLASTMESSAGEID_FIELD_NUMBER = 29;
+    private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getLastMessageId_;
+    public boolean hasGetLastMessageId() {
+      return ((bitField0_ & 0x10000000) == 0x10000000);
+    }
+    public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getGetLastMessageId() {
+      return getLastMessageId_;
+    }
+    
+    // optional .pulsar.proto.CommandGetLastMessageIdResponse 
getLastMessageIdResponse = 30;
+    public static final int GETLASTMESSAGEIDRESPONSE_FIELD_NUMBER = 30;
+    private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getLastMessageIdResponse_;
+    public boolean hasGetLastMessageIdResponse() {
+      return ((bitField0_ & 0x20000000) == 0x20000000);
+    }
+    public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getGetLastMessageIdResponse() {
+      return getLastMessageIdResponse_;
+    }
+    
     private void initFields() {
       type_ = 
org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT;
       connect_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance();
@@ -20644,6 +21506,8 @@ public final class PulsarApi {
       consumerStatsResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse.getDefaultInstance();
       reachedEndOfTopic_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopic.getDefaultInstance();
       seek_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance();
+      getLastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
+      getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -20804,6 +21668,18 @@ public final class PulsarApi {
           return false;
         }
       }
+      if (hasGetLastMessageId()) {
+        if (!getGetLastMessageId().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      if (hasGetLastMessageIdResponse()) {
+        if (!getGetLastMessageIdResponse().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -20900,6 +21776,12 @@ public final class PulsarApi {
       if (((bitField0_ & 0x08000000) == 0x08000000)) {
         output.writeMessage(28, seek_);
       }
+      if (((bitField0_ & 0x10000000) == 0x10000000)) {
+        output.writeMessage(29, getLastMessageId_);
+      }
+      if (((bitField0_ & 0x20000000) == 0x20000000)) {
+        output.writeMessage(30, getLastMessageIdResponse_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -21020,6 +21902,14 @@ public final class PulsarApi {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(28, seek_);
       }
+      if (((bitField0_ & 0x10000000) == 0x10000000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(29, getLastMessageId_);
+      }
+      if (((bitField0_ & 0x20000000) == 0x20000000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(30, getLastMessageIdResponse_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -21189,6 +22079,10 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x04000000);
         seek_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance();
         bitField0_ = (bitField0_ & ~0x08000000);
+        getLastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x10000000);
+        getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x20000000);
         return this;
       }
       
@@ -21334,6 +22228,14 @@ public final class PulsarApi {
           to_bitField0_ |= 0x08000000;
         }
         result.seek_ = seek_;
+        if (((from_bitField0_ & 0x10000000) == 0x10000000)) {
+          to_bitField0_ |= 0x10000000;
+        }
+        result.getLastMessageId_ = getLastMessageId_;
+        if (((from_bitField0_ & 0x20000000) == 0x20000000)) {
+          to_bitField0_ |= 0x20000000;
+        }
+        result.getLastMessageIdResponse_ = getLastMessageIdResponse_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -21424,6 +22326,12 @@ public final class PulsarApi {
         if (other.hasSeek()) {
           mergeSeek(other.getSeek());
         }
+        if (other.hasGetLastMessageId()) {
+          mergeGetLastMessageId(other.getGetLastMessageId());
+        }
+        if (other.hasGetLastMessageIdResponse()) {
+          mergeGetLastMessageIdResponse(other.getGetLastMessageIdResponse());
+        }
         return this;
       }
       
@@ -21582,6 +22490,18 @@ public final class PulsarApi {
             return false;
           }
         }
+        if (hasGetLastMessageId()) {
+          if (!getGetLastMessageId().isInitialized()) {
+            
+            return false;
+          }
+        }
+        if (hasGetLastMessageIdResponse()) {
+          if (!getGetLastMessageIdResponse().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -21886,6 +22806,26 @@ public final class PulsarApi {
               subBuilder.recycle();
               break;
             }
+            case 234: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.Builder 
subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.newBuilder();
+              if (hasGetLastMessageId()) {
+                subBuilder.mergeFrom(getGetLastMessageId());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetLastMessageId(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 242: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.Builder
 subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.newBuilder();
+              if (hasGetLastMessageIdResponse()) {
+                subBuilder.mergeFrom(getGetLastMessageIdResponse());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetLastMessageIdResponse(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
@@ -23077,6 +24017,92 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional .pulsar.proto.CommandGetLastMessageId getLastMessageId = 29;
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getLastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
+      public boolean hasGetLastMessageId() {
+        return ((bitField0_ & 0x10000000) == 0x10000000);
+      }
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId 
getGetLastMessageId() {
+        return getLastMessageId_;
+      }
+      public Builder 
setGetLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId
 value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getLastMessageId_ = value;
+        
+        bitField0_ |= 0x10000000;
+        return this;
+      }
+      public Builder setGetLastMessageId(
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.Builder 
builderForValue) {
+        getLastMessageId_ = builderForValue.build();
+        
+        bitField0_ |= 0x10000000;
+        return this;
+      }
+      public Builder 
mergeGetLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId
 value) {
+        if (((bitField0_ & 0x10000000) == 0x10000000) &&
+            getLastMessageId_ != 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance())
 {
+          getLastMessageId_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.newBuilder(getLastMessageId_).mergeFrom(value).buildPartial();
+        } else {
+          getLastMessageId_ = value;
+        }
+        
+        bitField0_ |= 0x10000000;
+        return this;
+      }
+      public Builder clearGetLastMessageId() {
+        getLastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x10000000);
+        return this;
+      }
+      
+      // optional .pulsar.proto.CommandGetLastMessageIdResponse 
getLastMessageIdResponse = 30;
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
+      public boolean hasGetLastMessageIdResponse() {
+        return ((bitField0_ & 0x20000000) == 0x20000000);
+      }
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getGetLastMessageIdResponse() {
+        return getLastMessageIdResponse_;
+      }
+      public Builder 
setGetLastMessageIdResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse
 value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getLastMessageIdResponse_ = value;
+        
+        bitField0_ |= 0x20000000;
+        return this;
+      }
+      public Builder setGetLastMessageIdResponse(
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.Builder
 builderForValue) {
+        getLastMessageIdResponse_ = builderForValue.build();
+        
+        bitField0_ |= 0x20000000;
+        return this;
+      }
+      public Builder 
mergeGetLastMessageIdResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse
 value) {
+        if (((bitField0_ & 0x20000000) == 0x20000000) &&
+            getLastMessageIdResponse_ != 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance())
 {
+          getLastMessageIdResponse_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.newBuilder(getLastMessageIdResponse_).mergeFrom(value).buildPartial();
+        } else {
+          getLastMessageIdResponse_ = value;
+        }
+        
+        bitField0_ |= 0x20000000;
+        return this;
+      }
+      public Builder clearGetLastMessageIdResponse() {
+        getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x20000000);
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 964fd52..ca3d2fb 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -135,6 +135,7 @@ enum ProtocolVersion {
        v9 = 9;  // Added end of topic notification
        v10 = 10;// Added proxy to broker
        v11 = 11;// C++ consumers before this version are not correctly 
handling the checksum field
+       v12 = 12;//Added get topic's last messageId from broker
 }
 
 message CommandConnect {
@@ -152,13 +153,13 @@ message CommandConnect {
        // a Pulsar proxy. In this case the auth info above
        // will be the auth of the proxy itself
        optional string original_principal = 7;
-       
-       // Original auth role and auth Method that was passed 
+
+       // Original auth role and auth Method that was passed
        // to the proxy. In this case the auth info above
-       // will be the auth of the proxy itself 
+       // will be the auth of the proxy itself
        optional string original_auth_data = 8;
        optional string original_auth_method = 9;
-       
+
 }
 
 message CommandConnected {
@@ -202,9 +203,9 @@ message CommandPartitionedTopicMetadata {
        // Original principal that was verified by
        // a Pulsar proxy.
        optional string original_principal = 3;
-       
-       // Original auth role and auth Method that was passed 
-       // to the proxy.        
+
+       // Original auth role and auth Method that was passed
+       // to the proxy.
        optional string original_auth_data = 4;
        optional string original_auth_method = 5;
 }
@@ -225,13 +226,13 @@ message CommandLookupTopic {
        required string topic            = 1;
        required uint64 request_id       = 2;
        optional bool authoritative      = 3 [default = false];
-       
+
        // Original principal that was verified by
        // a Pulsar proxy.
        optional string original_principal = 4;
-       
-       // Original auth role and auth Method that was passed 
-       // to the proxy.        
+
+       // Original auth role and auth Method that was passed
+       // to the proxy.
        optional string original_auth_data = 5;
        optional string original_auth_method = 6;
 }
@@ -340,7 +341,7 @@ message CommandUnsubscribe {
 message CommandSeek {
        required uint64 consumer_id = 1;
        required uint64 request_id  = 2;
-       
+
        optional MessageIdData message_id = 3;
 }
 
@@ -374,7 +375,7 @@ message CommandSuccess {
 message CommandProducerSuccess {
        required uint64 request_id    = 1;
        required string producer_name = 2;
-       
+
        // The last sequence id that was stored by this producer in the 
previous session
        // This will only be meaningful if deduplication has been enabled.
        optional int64  last_sequence_id = 3 [default = -1];
@@ -443,6 +444,16 @@ message CommandConsumerStatsResponse {
         optional uint64 msgBacklog                  = 15;
 }
 
+message CommandGetLastMessageId {
+       required uint64 consumer_id = 1;
+       required uint64 request_id  = 2;
+}
+
+message CommandGetLastMessageIdResponse {
+       required MessageIdData last_message_id = 1;
+       required uint64 request_id  = 2;
+}
+
 message BaseCommand {
        enum Type {
                CONNECT     = 2;
@@ -486,6 +497,9 @@ message BaseCommand {
                REACHED_END_OF_TOPIC = 27;
 
                SEEK = 28;
+
+               GET_LAST_MESSAGE_ID = 29;
+               GET_LAST_MESSAGE_ID_RESPONSE = 30;
        }
 
        required Type type = 1;
@@ -524,6 +538,11 @@ message BaseCommand {
        optional CommandConsumerStatsResponse consumerStatsResponse         = 
26;
 
        optional CommandReachedEndOfTopic reachedEndOfTopic  = 27;
-       
+
        optional CommandSeek seek = 28;
+
+       optional CommandGetLastMessageId getLastMessageId = 29;
+       optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30;
+
+
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to