satishd commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r704956559
##########
File path:
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##########
@@ -129,37 +132,44 @@ public void
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmen
}
// Publish the message to the topic.
-
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
segmentMetadataUpdate);
+ return
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
segmentMetadataUpdate);
} finally {
lock.readLock().unlock();
}
}
@Override
- public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata
remotePartitionDeleteMetadata)
+ public CompletableFuture<Void>
putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata
remotePartitionDeleteMetadata)
throws RemoteStorageException {
Objects.requireNonNull(remotePartitionDeleteMetadata,
"remotePartitionDeleteMetadata can not be null");
lock.readLock().lock();
try {
ensureInitializedAndNotClosed();
-
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(),
remotePartitionDeleteMetadata);
+ return
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(),
remotePartitionDeleteMetadata);
} finally {
lock.readLock().unlock();
}
}
- private void doPublishMetadata(TopicIdPartition topicIdPartition,
RemoteLogMetadata remoteLogMetadata)
+ private CompletableFuture<Void> doPublishMetadata(TopicIdPartition
topicIdPartition,
+ RemoteLogMetadata
remoteLogMetadata)
throws RemoteStorageException {
log.debug("Publishing metadata for partition: [{}] with context:
[{}]", topicIdPartition, remoteLogMetadata);
try {
// Publish the message to the topic.
- RecordMetadata recordMetadata =
producerManager.publishMessage(remoteLogMetadata);
- // Wait until the consumer catches up with this offset. This will
ensure read-after-write consistency
- // semantics.
- consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
+ CompletableFuture<RecordMetadata> produceFuture = new
CompletableFuture<>();
+ producerManager.publishMessage(remoteLogMetadata, produceFuture);
+ return produceFuture.thenApplyAsync((Function<RecordMetadata,
Void>) recordMetadata -> {
+ try {
+
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
Review comment:
`produceFuture` is completed after `ProducerRecordMetadata` is
completed. But `doPublishMetadata` takes `produceFuture` and composes with
`.thenApplyAsync()` and returns the `CompletableFuture` which will be completed
only after the `consumerManager.waitTillConsumptionCatchesUp(recordMetadata);`
is returned.
So, the returned `CompletableFuture` from `doPublishMetadata` is completed
only after the consumer is caughtup until the produced record offset. I will
document it in the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]