rich-c-shop commented on code in PR #17099:
URL: https://github.com/apache/kafka/pull/17099#discussion_r2033443850


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1546,6 +1547,7 @@ private AppendCallbacks(Callback userCallback, 
ProducerInterceptors<K, V> interc
             // whole lifetime of the batch.
             // We don't want to have an NPE here, because the interceptors 
would not be notified (see .doSend).
             topic = record != null ? record.topic() : null;
+            headers = record != null ? record.headers() : new RecordHeaders();

Review Comment:
   fca85102d87580a593098b8c1eb5ee97d941a43f



##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java:
##########
@@ -81,12 +82,37 @@ public interface ProducerInterceptor<K, V> extends 
Configurable, AutoCloseable {
      * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset).
      *                 If an error occurred, metadata will contain only valid 
topic and maybe
      *                 partition. If partition is not given in ProducerRecord 
and an error occurs
-     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.NO_PARTITION.
+     *                 before partition gets assigned, then partition will be 
set to {@link RecordMetadata#UNKNOWN_PARTITION}.
      *                 The metadata may be null if the client passed null 
record to
      *                 {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
      * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
      */
-    void onAcknowledgement(RecordMetadata metadata, Exception exception);
+    default void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {}
+
+    /**
+     * This method is called when the record sent to the server has been 
acknowledged, or when sending the record fails before
+     * it gets sent to the server.
+     * <p>
+     * This method is generally called just before the user callback is 
called, and in additional cases when <code>KafkaProducer.send()</code>
+     * throws an exception.
+     * <p>
+     * Any exception thrown by this method will be ignored by the caller.
+     * <p>
+     * This method will generally execute in the background I/O thread, so the 
implementation should be reasonably fast.
+     * Otherwise, sending of messages from other threads could be delayed.
+     *
+     * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset).
+     *                 If an error occurred, metadata will contain only valid 
topic and maybe
+     *                 partition. If partition is not given in ProducerRecord 
and an error occurs
+     *                 before partition gets assigned, then partition will be 
set to {@link RecordMetadata#UNKNOWN_PARTITION}.
+     *                 The metadata may be null if the client passed null 
record to
+     *                 {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
+     * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
+     * @param headers The headers for the record that was sent.

Review Comment:
   fca85102d87580a593098b8c1eb5ee97d941a43f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to