satishd commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r704021729



##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##########
@@ -129,37 +132,44 @@ public void 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmen
             }
 
             // Publish the message to the topic.
-            
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
 segmentMetadataUpdate);
+            return 
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
 segmentMetadataUpdate);
         } finally {
             lock.readLock().unlock();
         }
     }
 
     @Override
-    public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
+    public CompletableFuture<Void> 
putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
             throws RemoteStorageException {
         Objects.requireNonNull(remotePartitionDeleteMetadata, 
"remotePartitionDeleteMetadata can not be null");
 
         lock.readLock().lock();
         try {
             ensureInitializedAndNotClosed();
 
-            
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), 
remotePartitionDeleteMetadata);
+            return 
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), 
remotePartitionDeleteMetadata);
         } finally {
             lock.readLock().unlock();
         }
     }
 
-    private void doPublishMetadata(TopicIdPartition topicIdPartition, 
RemoteLogMetadata remoteLogMetadata)
+    private CompletableFuture<Void> doPublishMetadata(TopicIdPartition 
topicIdPartition,
+                                                      RemoteLogMetadata 
remoteLogMetadata)
             throws RemoteStorageException {
         log.debug("Publishing metadata for partition: [{}] with context: 
[{}]", topicIdPartition, remoteLogMetadata);
 
         try {
             // Publish the message to the topic.
-            RecordMetadata recordMetadata = 
producerManager.publishMessage(remoteLogMetadata);
-            // Wait until the consumer catches up with this offset. This will 
ensure read-after-write consistency
-            // semantics.
-            consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
+            CompletableFuture<RecordMetadata> produceFuture = new 
CompletableFuture<>();
+            producerManager.publishMessage(remoteLogMetadata, produceFuture);
+            return produceFuture.thenApplyAsync((Function<RecordMetadata, 
Void>) recordMetadata -> {
+                try {
+                    
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);

Review comment:
       This is not invoked from Producer's sender thread as it is run as async. 
You can see the below stack trace. By default it uses common `ForkJoinPool` if 
it supports parallelism.
   
   ```
   
org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.waitTillConsumptionCatchesUp(ConsumerManager.java:80)
   
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$doPublishMetadata$0(TopicBasedRemoteLogMetadataManager.java:165)
   
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
   
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479)
   java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
   
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
   java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
   java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
   java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to