[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168059642
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1256,102 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+return true;
+}
+
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
+opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
+0 , TimeUnit.MILLISECONDS);
+CompletableFuture getLastMessageIdFuture = new 
CompletableFuture<>();
+
+internalGetLastMessageIdAsync(backoff, opTimeoutMs, 
getLastMessageIdFuture);
+return getLastMessageIdFuture;
+}
+
+private void internalGetLastMessageIdAsync(final Backoff backoff,
+   final AtomicLong remainingTime,
+   CompletableFuture 
future) {
+if (isConnected()) {
 
 Review comment:
   I missed this before, but we should make sure the connection doesn't change 
while we're executing this method. 
   
   `isConnected()` is checking the current connection, but that might change 
when we ask for `cnx()` few lines below. 
   
   We need to first get a reference on `ClientCnx` and use that throughout the 
method. 
   
   ```java
   ClientCnx cnx = cnx();
   
   if (cnx != null) {
  // check cnx.getRemoteEndpointProtocolVersion();
  cnx.sendGetLastMessageId()... 
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168054861
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -145,6 +151,9 @@
 this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
 this.readCompacted = conf.getReadCompacted();
 
+this.getLastIdExecutor = Executors
 
 Review comment:
   This would create 1 thread per each consumer, we should reuse the executor 
that is already available from `PulsarClientImpl`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168054738
 
 

 ##
 File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
 ##
 @@ -952,4 +952,8 @@ public static ByteBuf newLookup(String topic, boolean 
authoritative, String orig
 lookupBroker.recycle();
 return res;
 }
+
+public static boolean peerSupportsGetLastMessageId() {
+return getCurrentProtocolVersion() >= ProtocolVersion.v12.getNumber();
 
 Review comment:
   This is just checking our own protocol version (which is always the "latest" 
from when we compiled the protobuf), though we need to check the other side 
version in this case the broker. 
   
   this should be like : 
   
   ```java
   public static boolean peerSupportsGetLastMessageId(ClientCnx cnx) {
  return ctx.getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v12.getNumber();
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168005589
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1254,103 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+return true;
+}
+
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+if (!isConnected()) {
+long opTimeoutMs = 
client.getConfiguration().getOperationTimeoutMs();
 
 Review comment:
   This implementation will block the caller of an asynchrounous method, which 
might be unexpected. One way to do the retries would be to have an internal 
method that gets called asynchrounously in recursion. For example, something 
like : 
   
   ```java
   private CompletableFuture internalGetLastMessageIdAsync(Backoff 
backoff, long remainingTime) {
   if (connected) { 
  // write on socket and return future
   } else {
  // if time is not elapsed yet... 
  long nextDelay = backoff.next();
  executor.schedule(() -> {
 remainingTime -= (timeSpentSinceLastCall);
  internalGetLastMessageIdAsync(backoff, remainingTime);
  }, nextDelay, TimeUnit.MILLISECONDS);
   }
   }
   ```
   
   (Don't read too much in the previous example, I'm just trying to illustrate 
the basic idea)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168006084
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1254,103 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+return true;
+}
+
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+if (!isConnected()) {
+long opTimeoutMs = 
client.getConfiguration().getOperationTimeoutMs();
+Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
+opTimeoutMs * 2, TimeUnit.MILLISECONDS,
+0 , TimeUnit.MILLISECONDS);
+
+long delayMs = backoff.firstBackoffTimeInMillis;;
+while (delayMs < opTimeoutMs && !isConnected()); {
+log.warn("[{}] [{}] Could not get connection while 
getLastMessageId -- Will try again in {} ms",
+topic, getHandlerName(), delayMs);
+try {
+Thread.sleep(delayMs);
+} catch (InterruptedException e) {
+return FutureUtil
+.failedFuture(new PulsarClientException
+.ConnectException("InterruptedException, could not 
connect"));
+}
+delayMs = backoff.next();
+}
+
+if (!isConnected()) {
+return FutureUtil.failedFuture(new PulsarClientException("Not 
connected to broker"));
+}
+}
+
+if (cnx().getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v12.getNumber()) {
 
 Review comment:
   Nit: please wrap this in a method like : 
   
   ```java
   void Commands.peerSupportsGetLastMessageId(ClientCnx cnx);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167382822
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1254,77 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+if (cnx().getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v11.getNumber()) {
 
 Review comment:
   If it's not connected `cnx()` will return null here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167382938
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1254,77 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+if (cnx().getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v11.getNumber()) {
+return FutureUtil
+.failedFuture(new PulsarClientException
+.NotSupportedException("GetLastMessageId Not supported for 
ProtocolVersion: " +
+cnx().getRemoteEndpointProtocolVersion()));
+}
+
+if (!isConnected()) {
 
 Review comment:
   If it's currently not connected, we should try to mask the exception from 
the user if the failure is transient. 
   
   There is already an `operationTimeout` defined in client, it would be good 
to have a way to retry internally with backoff up to that amount of time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167336300
 
 

 ##
 File path: pulsar-common/src/main/proto/PulsarApi.proto
 ##
 @@ -135,6 +135,7 @@ enum ProtocolVersion {
v9 = 9;  // Added end of topic notification
v10 = 10;// Added proxy to broker
v11 = 11;// C++ consumers before this version are not correctly 
handling the checksum field
+//Added get topic's last messageId from broker
 
 Review comment:
   @zhaijack There was a merge issue here, `v11` was already taken in master 
and this PR should be adding `v12` now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-06 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166483266
 
 

 ##
 File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java
 ##
 @@ -62,4 +62,14 @@
  * Return true if the topic was terminated and this reader has reached the 
end of the topic
  */
 boolean hasReachedEndOfTopic();
+
+/**
+ * Check if there is message that has been published successfully to the 
broker in the topic.
+ */
+Boolean hasMessageAvailable() throws PulsarClientException;
 
 Review comment:
   To minimize the time "blocked", we could include the last-published message 
id when the broker sends the `CommandMessage` so that the client can avoid 
asking, most of the time, since it will already see that the last was ahead of 
current positition. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-06 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166478972
 
 

 ##
 File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java
 ##
 @@ -62,4 +62,14 @@
  * Return true if the topic was terminated and this reader has reached the 
end of the topic
  */
 boolean hasReachedEndOfTopic();
+
+/**
+ * Check if there is message that has been published successfully to the 
broker in the topic.
+ */
+Boolean hasMessageAvailable() throws PulsarClientException;
 
 Review comment:
   `Boolean` -> `boolean`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-06 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166481485
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
 ##
 @@ -39,10 +39,11 @@
 protected final long entryId;
 protected final int partitionIndex;
 
-// Private constructor used only for json deserialization
-@SuppressWarnings("unused")
-private MessageIdImpl() {
-this(-1, -1, -1);
+// create MessageIdImpl use value of MessageId.earliest
+public MessageIdImpl() {
 
 Review comment:
   Since the `MessageidImpl` is immutable, couldn't we just reference 
`MessageId.earliest` instead of constructing a new one equal to that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-06 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166479151
 
 

 ##
 File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java
 ##
 @@ -62,4 +62,14 @@
  * Return true if the topic was terminated and this reader has reached the 
end of the topic
  */
 boolean hasReachedEndOfTopic();
+
+/**
+ * Check if there is message that has been published successfully to the 
broker in the topic.
 
 Review comment:
   "Check if there is any message available to read from the current position"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-06 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166483863
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
 ##
 @@ -28,8 +31,10 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
+import javax.validation.constraints.AssertFalse;
 
 Review comment:
   Use testng assertFalse


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-06 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166481095
 
 

 ##
 File path: pulsar-common/src/main/proto/PulsarApi.proto
 ##
 @@ -415,6 +415,16 @@ message CommandConsumerStatsResponse {
 optional uint64 msgBacklog  = 15;
 }
 
+message CommandGetLastMessageId {
 
 Review comment:
   @zhaijack The client should check that the broker supported version is `>= 
v11` before sending the command. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-06 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166483613
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
 ##
 @@ -359,4 +362,59 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map keyMe
 reader.close();
 log.info("-- Exiting {} test --", methodName);
 }
+
+
+@Test
+public void testSimpleReaderReachEndofTopic() throws Exception {
 
 Review comment:
   One scenario in which we need to pay attention is when batches are used. 
   
   With batches, multiple messages are stored in a single BK entry, and the 
broker treat a batch as a unit, which the consumer will finally break up, 
presenting the individual messages to application.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-06 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166482739
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
 ##
 @@ -453,6 +472,22 @@ SocketAddress serverAddrees() {
 return future;
 }
 
+public CompletableFuture sendGetLastMessageId(ByteBuf 
request, long requestId) {
+CompletableFuture future = new CompletableFuture<>();
+
+pendingGetLastMessageIdRequests.put(requestId, future);
 
 Review comment:
   Unfortunately we don't have a general way of handling it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-01-21 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r162838288
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
 ##
 @@ -278,4 +277,19 @@
  * @return a future to track the completion of the seek operation
  */
 CompletableFuture seekAsync(MessageId messageId);
+
+/**
+ * Gets last message id of Topic.
+ *
+ * @return the last message id
+ */
+MessageId getLastMessageId() throws PulsarClientException;
 
 Review comment:
   In the above example, rate limiting in broker will slow down the consumption


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-01-19 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r162751622
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
 ##
 @@ -278,4 +277,19 @@
  * @return a future to track the completion of the seek operation
  */
 CompletableFuture seekAsync(MessageId messageId);
+
+/**
+ * Gets last message id of Topic.
+ *
+ * @return the last message id
+ */
+MessageId getLastMessageId() throws PulsarClientException;
 
 Review comment:
   Rather than exposing this method, I would prefer to have a simpler way to 
check it, for a couple of reasons: 
1. Using and comparing message ids can be tricky (was that < or > ? :) )
   ```java
   MessageId lastMessageId = consumer.getLastMessageId();
   MessageId currentMessageId = null;
   while (currentMessageId == null || currentMessageId.compareTo(lastMessageId) 
<= 0) {
Message msg = consumer.receive();
/ 
consumer.acknowledge(msg);
   }
   ```
2. Most typical code would be like the above snippet and do a tight loop 
asking the broker for the last message id, without any throttling or 
flow-control
   
   Rather, I would prefer something like: 
   
   ```java
   while (consumer.hasMessageAvaiable()) {
   Message msg = consumer.receive();
   // Do something
   consumer.acknowledge(msg);
   }
   
   // Exits loops when the consumer has caught up with the writer
   ```
   
   Internally, the client library can ask the broker for the last message id, 
and cache it. For example if the `lastMessageId=123:12` but the consumer is 
currently receiving `110:5`, there's no need to keep asking the broker.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services