[ https://issues.apache.org/jira/browse/KAFKA-10236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vinodhini updated KAFKA-10236: ------------------------------ Description: Coming from [https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream|https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream?] *Background :* Setting consumer interceptor to StreamsConfig will ensure that the interceptor(s) are called when messages are consumed/committed. Snippet from {{org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync}} {code:java} if (future.succeeded()) { if (interceptors != null) interceptors.onCommit(offsets); return true; }{code} But the {{consumerInterceptor.onCommit()}} was never called even though I saw the offsets being committed at the source topic. *Issue:* I figured that it was because I was using kstreams with Exactly once processing guarantee enabled. This was the logic at {{org.apache.kafka.streams.processor.internals.StreamTask#commit}} {code:java} if (this.eosEnabled) { this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, this.applicationId); this.producer.commitTransaction(); if (startNewTransaction) { this.producer.beginTransaction(); } } else { this.consumer.commitSync(consumedOffsetsAndMetadata); } {code} As you can see, {{consumer.commitSync}} which in turns calls the {{consumerCoordinator.commit}} which calls the {{interceptor.onCommit}}, never gets called. Because with eos enabled, it is the transaction api that gets invoked. *Request* Provide a way to get committed offset from Interceptors for EOS enabled also. was: Coming from [https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream|https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream?] *Background :* Setting consumer interceptor to StreamsConfig will ensure that the interceptor(s) are called when messages are consumed/committed. Snippet from {{org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync}} {{}} {code:java} if (future.succeeded()) { if (interceptors != null) interceptors.onCommit(offsets); return true; }{code} But the {{consumerInterceptor.onCommit()}} was never called even though I saw the offsets being committed at the source topic. *Issue:* I figured that it was because I was using kstreams with Exactly once processing guarantee enabled. This was the logic at {{org.apache.kafka.streams.processor.internals.StreamTask#commit}} {code:java} if (this.eosEnabled) { this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, this.applicationId); this.producer.commitTransaction(); if (startNewTransaction) { this.producer.beginTransaction(); } } else { this.consumer.commitSync(consumedOffsetsAndMetadata); } {code} As you can see, {{consumer.commitSync}} which in turns calls the {{consumerCoordinator.commit}} which calls the {{interceptor.onCommit}}, never gets called. Because with eos enabled, it is the transaction api that gets invoked. *Request* Provide a way to get committed offset from Interceptors for EOS enabled also. > Kafka Streams | onCommit interceptor with EOS enabled > ------------------------------------------------------ > > Key: KAFKA-10236 > URL: https://issues.apache.org/jira/browse/KAFKA-10236 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.5.0 > Reporter: Vinodhini > Priority: Major > > Coming from > [https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream|https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream?] > > *Background :* > Setting consumer interceptor to StreamsConfig will ensure that the > interceptor(s) are called when messages are consumed/committed. Snippet from > {{org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync}} > > {code:java} > if (future.succeeded()) { > if (interceptors != null) > interceptors.onCommit(offsets); > return true; > }{code} > > But the {{consumerInterceptor.onCommit()}} was never called even though I saw > the offsets being committed at the source topic. > *Issue:* > I figured that it was because I was using kstreams with Exactly once > processing guarantee enabled. > This was the logic at > {{org.apache.kafka.streams.processor.internals.StreamTask#commit}} > > {code:java} > if (this.eosEnabled) { > this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, > this.applicationId); > this.producer.commitTransaction(); > if (startNewTransaction) { > this.producer.beginTransaction(); > } > } else { > this.consumer.commitSync(consumedOffsetsAndMetadata); > } > {code} > As you can see, {{consumer.commitSync}} which in turns calls the > {{consumerCoordinator.commit}} which calls the {{interceptor.onCommit}}, > never gets called. Because with eos enabled, it is the transaction api that > gets invoked. > > *Request* > Provide a way to get committed offset from Interceptors for EOS enabled also. -- This message was sent by Atlassian Jira (v8.3.4#803005)