junrao commented on a change in pull request #11033: URL: https://github.com/apache/kafka/pull/11033#discussion_r704689875
########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java ########## @@ -129,37 +132,44 @@ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmen } // Publish the message to the topic. - doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate); + return doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate); } finally { lock.readLock().unlock(); } } @Override - public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) + public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException { Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null"); lock.readLock().lock(); try { ensureInitializedAndNotClosed(); - doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata); + return doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata); } finally { lock.readLock().unlock(); } } - private void doPublishMetadata(TopicIdPartition topicIdPartition, RemoteLogMetadata remoteLogMetadata) + private CompletableFuture<Void> doPublishMetadata(TopicIdPartition topicIdPartition, + RemoteLogMetadata remoteLogMetadata) throws RemoteStorageException { log.debug("Publishing metadata for partition: [{}] with context: [{}]", topicIdPartition, remoteLogMetadata); try { // Publish the message to the topic. - RecordMetadata recordMetadata = producerManager.publishMessage(remoteLogMetadata); - // Wait until the consumer catches up with this offset. This will ensure read-after-write consistency - // semantics. - consumerManager.waitTillConsumptionCatchesUp(recordMetadata); + CompletableFuture<RecordMetadata> produceFuture = new CompletableFuture<>(); + producerManager.publishMessage(remoteLogMetadata, produceFuture); + return produceFuture.thenApplyAsync((Function<RecordMetadata, Void>) recordMetadata -> { + try { + consumerManager.waitTillConsumptionCatchesUp(recordMetadata); Review comment: Thanks for the explanation. Semantically, for the CompletableFuture returned from doPublishMetadata(), when do we expect it to complete? The implementation completes it after the metadata is acked in the producer. However, I thought it should be completed after the consumer has caught up on the offset? It would be useful to document this in the API clearly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org