rich-c-shop commented on code in PR #17099:
URL: https://github.com/apache/kafka/pull/17099#discussion_r1828626942
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java:
##########
@@ -82,11 +84,12 @@ public ProducerRecord<K, V> onSend(ProducerRecord<K, V>
record) {
* @param metadata The metadata for the record that was sent (i.e. the
partition and offset).
* If an error occurred, metadata will only contain valid
topic and maybe partition.
* @param exception The exception thrown during processing of this record.
Null if no error occurred.
+ * @param headers The headers for the record that was sent
*/
- public void onAcknowledgement(RecordMetadata metadata, Exception
exception) {
+ public void onAcknowledgement(RecordMetadata metadata, Exception
exception, Headers headers) {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
- interceptor.onAcknowledgement(metadata, exception);
+ interceptor.onAcknowledgement(metadata, exception, headers);
Review Comment:
addressed at e27c61c191
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java:
##########
@@ -107,14 +110,21 @@ public void onAcknowledgement(RecordMetadata metadata,
Exception exception) {
public void onSendError(ProducerRecord<K, V> record, TopicPartition
interceptTopicPartition, Exception exception) {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
+ Headers headers = record != null ? record.headers() : null;
+ if (headers instanceof RecordHeaders) {
Review Comment:
addressed at e27c61c191
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java:
##########
@@ -107,14 +110,21 @@ public void onAcknowledgement(RecordMetadata metadata,
Exception exception) {
public void onSendError(ProducerRecord<K, V> record, TopicPartition
interceptTopicPartition, Exception exception) {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
+ Headers headers = record != null ? record.headers() : null;
+ if (headers instanceof RecordHeaders) {
+ // make a copy of the headers to make sure it is read-only
Review Comment:
addressed at e27c61c191
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]