ccding commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r704546511



##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##########
@@ -50,37 +51,38 @@ public 
ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
         topicPartitioner = rlmmTopicPartitioner;
     }
 
-    public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) 
throws KafkaException {
+    public CompletableFuture<RecordMetadata> publishMessage(RemoteLogMetadata 
remoteLogMetadata) {
+        CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
+
         TopicIdPartition topicIdPartition = 
remoteLogMetadata.topicIdPartition();
         int metadataPartitionNum = 
topicPartitioner.metadataPartition(topicIdPartition);
         log.debug("Publishing metadata message of partition:[{}] into metadata 
topic partition:[{}] with payload: [{}]",
-                topicIdPartition, metadataPartitionNum, remoteLogMetadata);
+                  topicIdPartition, metadataPartitionNum, remoteLogMetadata);
         if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) 
{
             // This should never occur as long as metadata partitions always 
remain the same.
             throw new KafkaException("Chosen partition no " + 
metadataPartitionNum +
                                              " must be less than the partition 
count: " + rlmmConfig.metadataTopicPartitionsCount());
         }
 
-        ProducerCallback callback = new ProducerCallback();
         try {
+            Callback callback = new Callback() {
+                @Override
+                public void onCompletion(RecordMetadata metadata,
+                                         Exception exception) {
+                    if (exception != null) {
+                        future.completeExceptionally(exception);
+                    } else {
+                        future.complete(metadata);
+                    }
+                }
+            };
             producer.send(new 
ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, 
null,
-                    serde.serialize(remoteLogMetadata)), callback).get();
-        } catch (KafkaException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new KafkaException("Exception occurred while publishing 
message for topicIdPartition: " + topicIdPartition, e);
+                                               
serde.serialize(remoteLogMetadata)), callback);
+        } catch (Exception ex) {
+            future.completeExceptionally(ex);
         }

Review comment:
       do we want to remove printing the topic id in the exception?

##########
File path: 
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
##########
@@ -62,16 +63,17 @@
      * @param remoteLogSegmentMetadata metadata about the remote log segment.
      * @throws RemoteStorageException   if there are any storage related 
errors occurred.
      * @throws IllegalArgumentException if the given metadata instance does 
not have the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
+     * @return a CompletableFuture which will complete once this operation is 
finished.
      */
-    void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) throws RemoteStorageException;
+    CompletableFuture<Void> 
addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) 
throws RemoteStorageException;
 
     /**
-     * This method is used to update the {@link RemoteLogSegmentMetadata}. 
Currently, it allows to update with the new
+     * This method is used to update the {@link RemoteLogSegmentMetadata} 
asynchronously. Currently, it allows to update with the new
      * state based on the life cycle of the segment. It can go through the 
below state transitions.
      * <p>
      * <pre>
      * +---------------------+            +----------------------+
-     * |COPY_SEGMENT_STARTED |-----------&gt;|COPY_SEGMENT_FINISHED |
+     * |COPY_SEGMENT_STARTED |----------->|COPY_SEGMENT_FINISHED |

Review comment:
       Can you verify if this could result in a correct HTML doc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to