satishd commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r704559745



##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##########
@@ -50,37 +51,38 @@ public 
ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
         topicPartitioner = rlmmTopicPartitioner;
     }
 
-    public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) 
throws KafkaException {
+    public CompletableFuture<RecordMetadata> publishMessage(RemoteLogMetadata 
remoteLogMetadata) {
+        CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
+
         TopicIdPartition topicIdPartition = 
remoteLogMetadata.topicIdPartition();
         int metadataPartitionNum = 
topicPartitioner.metadataPartition(topicIdPartition);
         log.debug("Publishing metadata message of partition:[{}] into metadata 
topic partition:[{}] with payload: [{}]",
-                topicIdPartition, metadataPartitionNum, remoteLogMetadata);
+                  topicIdPartition, metadataPartitionNum, remoteLogMetadata);
         if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) 
{
             // This should never occur as long as metadata partitions always 
remain the same.
             throw new KafkaException("Chosen partition no " + 
metadataPartitionNum +
                                              " must be less than the partition 
count: " + rlmmConfig.metadataTopicPartitionsCount());
         }
 
-        ProducerCallback callback = new ProducerCallback();
         try {
+            Callback callback = new Callback() {
+                @Override
+                public void onCompletion(RecordMetadata metadata,
+                                         Exception exception) {
+                    if (exception != null) {
+                        future.completeExceptionally(exception);
+                    } else {
+                        future.complete(metadata);
+                    }
+                }
+            };
             producer.send(new 
ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, 
null,
-                    serde.serialize(remoteLogMetadata)), callback).get();
-        } catch (KafkaException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new KafkaException("Exception occurred while publishing 
message for topicIdPartition: " + topicIdPartition, e);
+                                               
serde.serialize(remoteLogMetadata)), callback);
+        } catch (Exception ex) {
+            future.completeExceptionally(ex);
         }

Review comment:
       Throwing exception is removed as you can see in the diff. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to