[ https://issues.apache.org/jira/browse/KAFKA-19650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18017368#comment-18017368 ]
Schubert Fernandes edited comment on KAFKA-19650 at 9/1/25 10:22 AM: --------------------------------------------------------------------- I have a Kafka Streams application that is part of a pipeline of applications which process messages off a series of Kafka topics. One input message results in related messages on the different topics. The related messages on the downstream topics use the same message key as the input message. As each application in the pipeline processes messages, the logging uses the Kafka message key on the logging context The message key can thus be used to trace processing across the entire application pipeline via the application logging. The Kafka Streams application has a registered _org.apache.kafka.clients.producer.ProducerInterceptor_ instance which is used to log any exceptions that occur during message publishing, or, on success, to log the partition and offset of the output message{_}.{_} The problem is that _org.apache.kafka.clients.producer.ProducerInterceptor.onAcknowledgement(RecordMetadata, Exception)_ is called from the producer thread created by the Kafka clients library and it doesn't have the details of the producer record, so there is no way for my application to put the message key into the logging context for any logging to pick up. In hindsight, suggesting adding the key to the metadata was a bad idea. A better solution would be to add an overloaded variant of the _onAcknowledgement_ method to _org.apache.kafka.clients.producer.ProducerInterceptor_ which takes the _org.apache.kafka.clients.producer.ProducerRecord<K, V>_ instance as an additional parameter. >From browsing through the Kafka clients code, this looks doable, but I'll >leave that decision up to you guys. Thanks for your time. was (Author: schubertf): I have a Kafka Streams application that is part of a pipeline of applications which process messages off a series of Kafka topics. One input message results in related messages on the different topics. The related messages use the same message key as the input message on the other topics. As each application in the pipeline processes messages the logging uses the Kafka message key on the logging context The message key can thus be used to trace processing across the entire application pipeline via the application logging. The Kafka Streams application has a registered _org.apache.kafka.clients.producer.ProducerInterceptor_ instance which is used to log any exceptions that occur during message publishing, or, on success, to log the partition and offset of the output message{_}.{_} The problem is that _org.apache.kafka.clients.producer.ProducerInterceptor.onAcknowledgement(RecordMetadata, Exception)_ is called from the producer thread created by the Kafka clients library and it doesn't have the details of the producer record, so there is no way for my application to put the message key into the logging context for any logging to pick up. In hindsight, suggesting adding the key to the metadata was a bad idea. A better solution would be to add an overloaded variant of the _onAcknowledgement_ method to _org.apache.kafka.clients.producer.ProducerInterceptor_ which takes the _org.apache.kafka.clients.producer.ProducerRecord<K, V>_ instance as another parameter. >From browsing through the Kafka clients code, this looks doable, but I'll >leave that decision up to you guys. Thanks for your time. > Add message key to org.apache.kafka.clients.producer.RecordMetadata > ------------------------------------------------------------------- > > Key: KAFKA-19650 > URL: https://issues.apache.org/jira/browse/KAFKA-19650 > Project: Kafka > Issue Type: Wish > Components: clients, streams > Reporter: Schubert Fernandes > Priority: Minor > Labels: needs-kip > > Although the message key is not really {_}metadata{_}, it may be useful to > include it in the _org.apache.kafka.clients.producer.RecordMetadata_ class so > that metdata can be tied back to a specific message. > When using a standard Kafka producer it is easy to tie back the metadata to a > specific message by using the callback mechanism. > However, when using Kafka streams, the only way to access the metadata and > log the details is to register a stream-level > _org.apache.kafka.clients.producer.ProducerInterceptor_ instance. > This mechanism has a drawback in that it is impossible to tie the > RecordMetadata instance back to a particular message. > Including the message key in the metadata would solve this problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)