satishd commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r704559745
##########
File path:
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##########
@@ -50,37 +51,38 @@ public
ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
topicPartitioner = rlmmTopicPartitioner;
}
- public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata)
throws KafkaException {
+ public CompletableFuture<RecordMetadata> publishMessage(RemoteLogMetadata
remoteLogMetadata) {
+ CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
+
TopicIdPartition topicIdPartition =
remoteLogMetadata.topicIdPartition();
int metadataPartitionNum =
topicPartitioner.metadataPartition(topicIdPartition);
log.debug("Publishing metadata message of partition:[{}] into metadata
topic partition:[{}] with payload: [{}]",
- topicIdPartition, metadataPartitionNum, remoteLogMetadata);
+ topicIdPartition, metadataPartitionNum, remoteLogMetadata);
if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount())
{
// This should never occur as long as metadata partitions always
remain the same.
throw new KafkaException("Chosen partition no " +
metadataPartitionNum +
" must be less than the partition
count: " + rlmmConfig.metadataTopicPartitionsCount());
}
- ProducerCallback callback = new ProducerCallback();
try {
+ Callback callback = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata,
+ Exception exception) {
+ if (exception != null) {
+ future.completeExceptionally(exception);
+ } else {
+ future.complete(metadata);
+ }
+ }
+ };
producer.send(new
ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum,
null,
- serde.serialize(remoteLogMetadata)), callback).get();
- } catch (KafkaException e) {
- throw e;
- } catch (Exception e) {
- throw new KafkaException("Exception occurred while publishing
message for topicIdPartition: " + topicIdPartition, e);
+
serde.serialize(remoteLogMetadata)), callback);
+ } catch (Exception ex) {
+ future.completeExceptionally(ex);
}
Review comment:
Throwing exception is removed as you can see in the diff.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]