lianetm commented on code in PR #17099:
URL: https://github.com/apache/kafka/pull/17099#discussion_r1901095800


##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java:
##########
@@ -81,12 +82,40 @@ public interface ProducerInterceptor<K, V> extends 
Configurable, AutoCloseable {
      * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset).
      *                 If an error occurred, metadata will contain only valid 
topic and maybe
      *                 partition. If partition is not given in ProducerRecord 
and an error occurs
-     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.NO_PARTITION.
+     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.UNKNOWN_PARTITION.
      *                 The metadata may be null if the client passed null 
record to
      *                 {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
      * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
      */
-    void onAcknowledgement(RecordMetadata metadata, Exception exception);
+    default void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {}
+
+    /**
+     * This method is called when the record sent to the server has been 
acknowledged, or when sending the record fails before
+     * it gets sent to the server.
+     * <p>
+     * This method is called just before the user callback is invoked, or in 
cases when
+     * <code>KafkaProducer.send()</code> throws an exception.
+     * throws an exception.
+     * <p>
+     * Note that any exception thrown by this method will be ignored by the 
caller.
+     * <p>
+     * The implementation of this method should be fast as it generally 
executes in a background
+     * I/O thread. A slow implementation could delay the sending of messages 
from other threads.
+
+     * Otherwise, sending of messages from other threads could be delayed.

Review Comment:
   We have kind of a duplicate here 
   ```
   A slow implementation could delay the sending of messages from other threads.
   ```
   and
   ```
   Otherwise, sending of messages from other threads could be delayed.
   ```
   I would maybe keep the latter just to keep the same we have in the other 
`onAcknowledgement` 



##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -1099,6 +1105,29 @@ private <T extends Serializer<String>> void 
doTestHeaders(Class<T> serializerCla
         producer.close(Duration.ofMillis(0));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testHeadersFailure() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 500);

Review Comment:
   I would push this config even lower (5 maybe?). This will make the test 
block uselessly for 500 ms on `waitOnMetadata`, to then fail (which is truly 
the bit this test is after, so let's get there asap and avoid making the test 
suite slower)



##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java:
##########
@@ -81,12 +82,40 @@ public interface ProducerInterceptor<K, V> extends 
Configurable, AutoCloseable {
      * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset).
      *                 If an error occurred, metadata will contain only valid 
topic and maybe
      *                 partition. If partition is not given in ProducerRecord 
and an error occurs
-     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.NO_PARTITION.
+     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.UNKNOWN_PARTITION.

Review Comment:
   what about using a link to avoid this falling behind again 
   ```suggestion
        *                 before partition gets assigned, then partition will 
be set to {@link RecordMetadata#UNKNOWN_PARTITION}.
   ```



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java:
##########
@@ -107,14 +110,21 @@ public void onAcknowledgement(RecordMetadata metadata, 
Exception exception) {
     public void onSendError(ProducerRecord<K, V> record, TopicPartition 
interceptTopicPartition, Exception exception) {
         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
             try {
+                Headers headers = record != null ? record.headers() : null;
+                if (headers instanceof RecordHeaders && !((RecordHeaders) 
headers).isReadOnly()) {
+                    // make a copy of the headers to make sure it is read-only

Review Comment:
   I agree with how you extended the comment @rich-c-shop, but still I find we 
could better clarify what @chia7712 suggested. What about something along the 
lines of:
   ```suggestion
                       // make a copy of the headers to make sure we don't 
change the state of origin record's headers.
                       // Original headers are still writable because client 
might want to mutate them before retrying.
   ```
   (basically we have 2 things here, we don't want to change the record headers 
within this func, and they also need to be writable)



##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java:
##########
@@ -81,12 +82,40 @@ public interface ProducerInterceptor<K, V> extends 
Configurable, AutoCloseable {
      * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset).
      *                 If an error occurred, metadata will contain only valid 
topic and maybe
      *                 partition. If partition is not given in ProducerRecord 
and an error occurs
-     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.NO_PARTITION.
+     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.UNKNOWN_PARTITION.
      *                 The metadata may be null if the client passed null 
record to
      *                 {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
      * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
      */
-    void onAcknowledgement(RecordMetadata metadata, Exception exception);
+    default void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {}
+
+    /**
+     * This method is called when the record sent to the server has been 
acknowledged, or when sending the record fails before
+     * it gets sent to the server.
+     * <p>
+     * This method is called just before the user callback is invoked, or in 
cases when
+     * <code>KafkaProducer.send()</code> throws an exception.
+     * throws an exception.
+     * <p>
+     * Note that any exception thrown by this method will be ignored by the 
caller.
+     * <p>
+     * The implementation of this method should be fast as it generally 
executes in a background
+     * I/O thread. A slow implementation could delay the sending of messages 
from other threads.
+
+     * Otherwise, sending of messages from other threads could be delayed.
+     *
+     * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset).
+     *                 If an error occurred, metadata will contain only valid 
topic and maybe
+     *                 partition. If partition is not given in ProducerRecord 
and an error occurs
+     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.UNKNOWN_PARTITION.
+     *                 The metadata may be null if the client passed null 
record to
+     *                 {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
+     * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
+     * @param headers The headers for the record that was sent. This parameter 
may be null.

Review Comment:
   This suggestion makes sense to me. Thoughts @rich-c-shop ?



##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java:
##########
@@ -81,12 +82,40 @@ public interface ProducerInterceptor<K, V> extends 
Configurable, AutoCloseable {
      * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset).
      *                 If an error occurred, metadata will contain only valid 
topic and maybe
      *                 partition. If partition is not given in ProducerRecord 
and an error occurs
-     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.NO_PARTITION.
+     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.UNKNOWN_PARTITION.
      *                 The metadata may be null if the client passed null 
record to
      *                 {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
      * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
      */
-    void onAcknowledgement(RecordMetadata metadata, Exception exception);
+    default void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {}
+
+    /**
+     * This method is called when the record sent to the server has been 
acknowledged, or when sending the record fails before
+     * it gets sent to the server.
+     * <p>
+     * This method is called just before the user callback is invoked, or in 
cases when
+     * <code>KafkaProducer.send()</code> throws an exception.
+     * throws an exception.
+     * <p>
+     * Note that any exception thrown by this method will be ignored by the 
caller.
+     * <p>
+     * The implementation of this method should be fast as it generally 
executes in a background
+     * I/O thread. A slow implementation could delay the sending of messages 
from other threads.
+
+     * Otherwise, sending of messages from other threads could be delayed.
+     *
+     * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset).
+     *                 If an error occurred, metadata will contain only valid 
topic and maybe
+     *                 partition. If partition is not given in ProducerRecord 
and an error occurs
+     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.UNKNOWN_PARTITION.

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -1099,6 +1105,29 @@ private <T extends Serializer<String>> void 
doTestHeaders(Class<T> serializerCla
         producer.close(Duration.ofMillis(0));
     }
 
+    @SuppressWarnings("unchecked")

Review Comment:
   why do we need to suppress here? 
   
   If it's only because of the  `Serializer<String> keySerializer = 
mock(Serializer.class);` I would say that we should better fix it to 
`Serializer<String> keySerializer = mock(StringSerializer.class);`  and avoid 
swallowing the checks. Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to