chia7712 commented on code in PR #17099:
URL: https://github.com/apache/kafka/pull/17099#discussion_r1827065687


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1552,6 +1552,7 @@ private class AppendCallbacks implements 
RecordAccumulator.AppendCallbacks {
         private final String recordLogString;
         private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
         private volatile TopicPartition topicPartition;
+        private Headers headers;

Review Comment:
   please add `final`



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java:
##########
@@ -107,14 +110,21 @@ public void onAcknowledgement(RecordMetadata metadata, 
Exception exception) {
     public void onSendError(ProducerRecord<K, V> record, TopicPartition 
interceptTopicPartition, Exception exception) {
         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
             try {
+                Headers headers = record != null ? record.headers() : null;
+                if (headers instanceof RecordHeaders) {
+                    // make a copy of the headers to make sure it is read-only
+                    RecordHeaders recordHeaders = (RecordHeaders) headers;
+                    headers = new RecordHeaders(recordHeaders);
+                    ((RecordHeaders) headers).setReadOnly();
+                }
                 if (record == null && interceptTopicPartition == null) {
-                    interceptor.onAcknowledgement(null, exception);
+                    interceptor.onAcknowledgement(null, exception, headers);

Review Comment:
   We must call `onAcknowledgement(RecordMetadata, Exception, Headers)` even if 
headers is null. Otherwise, users who implement only the new method will miss 
this event.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java:
##########
@@ -107,14 +110,21 @@ public void onAcknowledgement(RecordMetadata metadata, 
Exception exception) {
     public void onSendError(ProducerRecord<K, V> record, TopicPartition 
interceptTopicPartition, Exception exception) {
         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
             try {
+                Headers headers = record != null ? record.headers() : null;
+                if (headers instanceof RecordHeaders) {
+                    // make a copy of the headers to make sure it is read-only

Review Comment:
   we don't need to copy it if it is read-only, right?



##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java:
##########
@@ -86,7 +87,37 @@ public interface ProducerInterceptor<K, V> extends 
Configurable, AutoCloseable {
      *                 {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
      * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
      */
-    void onAcknowledgement(RecordMetadata metadata, Exception exception);
+    default void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {};
+
+    /**
+     * This method is called when the record sent to the server has been 
acknowledged, or when sending the record fails before
+     * it gets sent to the server.
+     * <p>
+     * This method is called just before the user callback is invoked, or in 
cases when
+     * <code>KafkaProducer.send()</code> throws an exception.
+     * throws an exception.
+     * <p>
+     * Note that any exception thrown by this method will be ignored by the 
caller.
+     * <p>
+     * The implementation of this method should be fast as it generally 
executes in a background
+     * I/O thread. A slow implementation could delay the sending of messages 
from other threads.
+
+     * Otherwise, sending of messages from other threads could be delayed.
+     *
+     * @param metadata The metadata for the record that was sent. It includes 
the partition and offset  of the record.
+     *                If an error occurred, the metadata will contain only the 
topic  and possibly the partition.
+
+     *                If the partition was not assigned yet due to an error,
+     *                 it will be set to {@link 
org.apache.kafka.clients.producer.RecordMetadata#UNKNOWN_PARTITION}.
+     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.NO_PARTITION.
+     *                 The metadata may be null if the client passed null 
record to
+     *                 {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
+     * @param exception The exception thrown during the processing of this 
record, or null if no error occurred.
+     * @param headers The headers for the record that was sent. This parameter 
may be null.
+     */
+    default void onAcknowledgement(RecordMetadata metadata, Exception 
exception, Headers headers) {
+        onAcknowledgement(metadata, exception);
+    };

Review Comment:
   Please remove `;`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to