[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168059642 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1256,102 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +return true; +} + +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); +Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, +opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS, +0 , TimeUnit.MILLISECONDS); +CompletableFuture getLastMessageIdFuture = new CompletableFuture<>(); + +internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture); +return getLastMessageIdFuture; +} + +private void internalGetLastMessageIdAsync(final Backoff backoff, + final AtomicLong remainingTime, + CompletableFuture future) { +if (isConnected()) { Review comment: I missed this before, but we should make sure the connection doesn't change while we're executing this method. `isConnected()` is checking the current connection, but that might change when we ask for `cnx()` few lines below. We need to first get a reference on `ClientCnx` and use that throughout the method. ```java ClientCnx cnx = cnx(); if (cnx != null) { // check cnx.getRemoteEndpointProtocolVersion(); cnx.sendGetLastMessageId()... } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168054861 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -145,6 +151,9 @@ this.batchMessageAckTracker = new ConcurrentSkipListMap<>(); this.readCompacted = conf.getReadCompacted(); +this.getLastIdExecutor = Executors Review comment: This would create 1 thread per each consumer, we should reuse the executor that is already available from `PulsarClientImpl` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168054738 ## File path: pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java ## @@ -952,4 +952,8 @@ public static ByteBuf newLookup(String topic, boolean authoritative, String orig lookupBroker.recycle(); return res; } + +public static boolean peerSupportsGetLastMessageId() { +return getCurrentProtocolVersion() >= ProtocolVersion.v12.getNumber(); Review comment: This is just checking our own protocol version (which is always the "latest" from when we compiled the protobuf), though we need to check the other side version in this case the broker. this should be like : ```java public static boolean peerSupportsGetLastMessageId(ClientCnx cnx) { return ctx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v12.getNumber(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168005589 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1254,103 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +return true; +} + +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +if (!isConnected()) { +long opTimeoutMs = client.getConfiguration().getOperationTimeoutMs(); Review comment: This implementation will block the caller of an asynchrounous method, which might be unexpected. One way to do the retries would be to have an internal method that gets called asynchrounously in recursion. For example, something like : ```java private CompletableFuture internalGetLastMessageIdAsync(Backoff backoff, long remainingTime) { if (connected) { // write on socket and return future } else { // if time is not elapsed yet... long nextDelay = backoff.next(); executor.schedule(() -> { remainingTime -= (timeSpentSinceLastCall); internalGetLastMessageIdAsync(backoff, remainingTime); }, nextDelay, TimeUnit.MILLISECONDS); } } ``` (Don't read too much in the previous example, I'm just trying to illustrate the basic idea) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168006084 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1254,103 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +return true; +} + +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +if (!isConnected()) { +long opTimeoutMs = client.getConfiguration().getOperationTimeoutMs(); +Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, +opTimeoutMs * 2, TimeUnit.MILLISECONDS, +0 , TimeUnit.MILLISECONDS); + +long delayMs = backoff.firstBackoffTimeInMillis;; +while (delayMs < opTimeoutMs && !isConnected()); { +log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms", +topic, getHandlerName(), delayMs); +try { +Thread.sleep(delayMs); +} catch (InterruptedException e) { +return FutureUtil +.failedFuture(new PulsarClientException +.ConnectException("InterruptedException, could not connect")); +} +delayMs = backoff.next(); +} + +if (!isConnected()) { +return FutureUtil.failedFuture(new PulsarClientException("Not connected to broker")); +} +} + +if (cnx().getRemoteEndpointProtocolVersion() < ProtocolVersion.v12.getNumber()) { Review comment: Nit: please wrap this in a method like : ```java void Commands.peerSupportsGetLastMessageId(ClientCnx cnx); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167382822 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1254,77 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +if (cnx().getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { Review comment: If it's not connected `cnx()` will return null here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167382938 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1254,77 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +if (cnx().getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { +return FutureUtil +.failedFuture(new PulsarClientException +.NotSupportedException("GetLastMessageId Not supported for ProtocolVersion: " + +cnx().getRemoteEndpointProtocolVersion())); +} + +if (!isConnected()) { Review comment: If it's currently not connected, we should try to mask the exception from the user if the failure is transient. There is already an `operationTimeout` defined in client, it would be good to have a way to retry internally with backoff up to that amount of time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167336300 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -135,6 +135,7 @@ enum ProtocolVersion { v9 = 9; // Added end of topic notification v10 = 10;// Added proxy to broker v11 = 11;// C++ consumers before this version are not correctly handling the checksum field +//Added get topic's last messageId from broker Review comment: @zhaijack There was a merge issue here, `v11` was already taken in master and this PR should be adding `v12` now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166483266 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java ## @@ -62,4 +62,14 @@ * Return true if the topic was terminated and this reader has reached the end of the topic */ boolean hasReachedEndOfTopic(); + +/** + * Check if there is message that has been published successfully to the broker in the topic. + */ +Boolean hasMessageAvailable() throws PulsarClientException; Review comment: To minimize the time "blocked", we could include the last-published message id when the broker sends the `CommandMessage` so that the client can avoid asking, most of the time, since it will already see that the last was ahead of current positition. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166478972 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java ## @@ -62,4 +62,14 @@ * Return true if the topic was terminated and this reader has reached the end of the topic */ boolean hasReachedEndOfTopic(); + +/** + * Check if there is message that has been published successfully to the broker in the topic. + */ +Boolean hasMessageAvailable() throws PulsarClientException; Review comment: `Boolean` -> `boolean` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166481485 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java ## @@ -39,10 +39,11 @@ protected final long entryId; protected final int partitionIndex; -// Private constructor used only for json deserialization -@SuppressWarnings("unused") -private MessageIdImpl() { -this(-1, -1, -1); +// create MessageIdImpl use value of MessageId.earliest +public MessageIdImpl() { Review comment: Since the `MessageidImpl` is immutable, couldn't we just reference `MessageId.earliest` instead of constructing a new one equal to that? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166479151 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java ## @@ -62,4 +62,14 @@ * Return true if the topic was terminated and this reader has reached the end of the topic */ boolean hasReachedEndOfTopic(); + +/** + * Check if there is message that has been published successfully to the broker in the topic. Review comment: "Check if there is any message available to read from the current position" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166483863 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java ## @@ -28,8 +31,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; - +import javax.validation.constraints.AssertFalse; Review comment: Use testng assertFalse This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166481095 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -415,6 +415,16 @@ message CommandConsumerStatsResponse { optional uint64 msgBacklog = 15; } +message CommandGetLastMessageId { Review comment: @zhaijack The client should check that the broker supported version is `>= v11` before sending the command. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166483613 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java ## @@ -359,4 +362,59 @@ public EncryptionKeyInfo getPrivateKey(String keyName, MapkeyMe reader.close(); log.info("-- Exiting {} test --", methodName); } + + +@Test +public void testSimpleReaderReachEndofTopic() throws Exception { Review comment: One scenario in which we need to pay attention is when batches are used. With batches, multiple messages are stored in a single BK entry, and the broker treat a batch as a unit, which the consumer will finally break up, presenting the individual messages to application. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r166482739 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java ## @@ -453,6 +472,22 @@ SocketAddress serverAddrees() { return future; } +public CompletableFuture sendGetLastMessageId(ByteBuf request, long requestId) { +CompletableFuture future = new CompletableFuture<>(); + +pendingGetLastMessageIdRequests.put(requestId, future); Review comment: Unfortunately we don't have a general way of handling it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r162838288 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java ## @@ -278,4 +277,19 @@ * @return a future to track the completion of the seek operation */ CompletableFuture seekAsync(MessageId messageId); + +/** + * Gets last message id of Topic. + * + * @return the last message id + */ +MessageId getLastMessageId() throws PulsarClientException; Review comment: In the above example, rate limiting in broker will slow down the consumption This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r162751622 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java ## @@ -278,4 +277,19 @@ * @return a future to track the completion of the seek operation */ CompletableFuture seekAsync(MessageId messageId); + +/** + * Gets last message id of Topic. + * + * @return the last message id + */ +MessageId getLastMessageId() throws PulsarClientException; Review comment: Rather than exposing this method, I would prefer to have a simpler way to check it, for a couple of reasons: 1. Using and comparing message ids can be tricky (was that < or > ? :) ) ```java MessageId lastMessageId = consumer.getLastMessageId(); MessageId currentMessageId = null; while (currentMessageId == null || currentMessageId.compareTo(lastMessageId) <= 0) { Message msg = consumer.receive(); / consumer.acknowledge(msg); } ``` 2. Most typical code would be like the above snippet and do a tight loop asking the broker for the last message id, without any throttling or flow-control Rather, I would prefer something like: ```java while (consumer.hasMessageAvaiable()) { Message msg = consumer.receive(); // Do something consumer.acknowledge(msg); } // Exits loops when the consumer has caught up with the writer ``` Internally, the client library can ask the broker for the last message id, and cache it. For example if the `lastMessageId=123:12` but the consumer is currently receiving `110:5`, there's no need to keep asking the broker. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services