[ 
https://issues.apache.org/jira/browse/KAFKA-19650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18017368#comment-18017368
 ] 

Schubert Fernandes edited comment on KAFKA-19650 at 9/1/25 10:22 AM:
---------------------------------------------------------------------

I have a Kafka Streams application that is part of a pipeline of applications 
which process messages off a series of Kafka topics.
One input message results in related messages on the different topics. The 
related messages on the downstream topics use the same message key as the input 
message.
As each application in the pipeline processes messages, the logging uses the 
Kafka message key on the logging context  The message key can thus be used to 
trace processing across the entire application pipeline via the application 
logging.

The Kafka Streams application has a registered 
_org.apache.kafka.clients.producer.ProducerInterceptor_ instance which is used 
to log any exceptions that occur during message publishing, or, on success, to 
log the partition and offset of the output message{_}.{_}
The problem is that 
_org.apache.kafka.clients.producer.ProducerInterceptor.onAcknowledgement(RecordMetadata,
 Exception)_ is called from the producer thread created by the Kafka clients 
library and it doesn't have the details of the producer record, so there is no 
way for my application to put the message key into the logging context for any 
logging to pick up.
In hindsight, suggesting adding the key to the metadata was a bad idea.
A better solution would be to add an overloaded variant of the 
_onAcknowledgement_ method to 
_org.apache.kafka.clients.producer.ProducerInterceptor_ which takes the 
_org.apache.kafka.clients.producer.ProducerRecord<K, V>_ instance as an 
additional parameter.
>From browsing through the Kafka clients code, this looks doable, but I'll 
>leave that decision up to you guys.
Thanks for your time.


was (Author: schubertf):
I have a Kafka Streams application that is part of a pipeline of applications 
which process messages off a series of Kafka topics.
One input message results in related messages on the different topics. The 
related messages use the same message key as the input message on the other 
topics.
As each application in the pipeline processes messages the logging uses the 
Kafka message key on the logging context  The message key can thus be used to 
trace processing across the entire application pipeline via the application 
logging.

The Kafka Streams application has a registered 
_org.apache.kafka.clients.producer.ProducerInterceptor_ instance which is used 
to log any exceptions that occur during message publishing, or, on success, to 
log the partition and offset of the output message{_}.{_}
The problem is that 
_org.apache.kafka.clients.producer.ProducerInterceptor.onAcknowledgement(RecordMetadata,
 Exception)_ is called from the producer thread created by the Kafka clients 
library and it doesn't have the details of the producer record, so there is no 
way for my application to put the message key into the logging context for any 
logging to pick up.
In hindsight, suggesting adding the key to the metadata was a bad idea.
A better solution would be to add an overloaded variant of the 
_onAcknowledgement_ method to 
_org.apache.kafka.clients.producer.ProducerInterceptor_ which takes the 
_org.apache.kafka.clients.producer.ProducerRecord<K, V>_ instance as another 
parameter.
>From browsing through the Kafka clients code, this looks doable, but I'll 
>leave that decision up to you guys.
Thanks for your time.

> Add message key to org.apache.kafka.clients.producer.RecordMetadata
> -------------------------------------------------------------------
>
>                 Key: KAFKA-19650
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19650
>             Project: Kafka
>          Issue Type: Wish
>          Components: clients, streams
>            Reporter: Schubert Fernandes
>            Priority: Minor
>              Labels: needs-kip
>
> Although the message key is not really {_}metadata{_}, it may be useful to 
> include it in the _org.apache.kafka.clients.producer.RecordMetadata_ class so 
> that metdata can be tied back to a specific message.
> When using a standard Kafka producer it is easy to tie back the metadata to a 
> specific message by using the callback mechanism.
> However, when using Kafka streams, the only way to access the metadata and 
> log the details is to register a stream-level 
> _org.apache.kafka.clients.producer.ProducerInterceptor_ instance.
> This mechanism has a drawback in that it is impossible to tie the 
> RecordMetadata instance back to a particular message. 
> Including the message key in the metadata would solve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to