skonto edited a comment on issue #24613: [SPARK-27549][SS] Add support for committing kafka offsets per batch for supporting external tooling URL: https://github.com/apache/spark/pull/24613#issuecomment-493961670 @gaborgsomogyi > A high level design question. [Here](https://gist.github.com/gaborgsomogyi/3410f6ec32b1d7bf2e2cb8e618d7d37f) is how Spark generates consumer group IDs. How is it planned to track lag if 2 queries reading the same topic? Each query will generate its own [unique consumer group id](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L81), check next where I just added one more query for the example in the description of this PR: ``` [spark-kafka-source-413106f6-9d4f-491d-8782-f1f800acb6fe-1453634933-driver-0,TutorialTopic5,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1558354165044, expireTimestamp=None) [spark-kafka-source-0b4ac39b-98d0-4e94-ae01-f4168bebae2a-1568478783-driver-0,TutorialTopic5,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1558354165044, expireTimestamp=None) [spark-kafka-source-413106f6-9d4f-491d-8782-f1f800acb6fe-1453634933-driver-0,TutorialTopic5,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1558354170047, expireTimestamp=None) [spark-kafka-source-0b4ac39b-98d0-4e94-ae01-f4168bebae2a-1568478783-driver-0,TutorialTopic5,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1558354170051, expireTimestamp=None) [spark-kafka-source-413106f6-9d4f-491d-8782-f1f800acb6fe-1453634933-driver-0,TutorialTopic5,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1558354175022, expireTimestamp=None) [spark-kafka-source-0b4ac39b-98d0-4e94-ae01-f4168bebae2a-1568478783-driver-0,TutorialTopic5,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1558354175028, expireTimestamp=None) [spark-kafka-source-413106f6-9d4f-491d-8782-f1f800acb6fe-1453634933-driver-0,TutorialTopic5,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1558354180045, expireTimestamp=None) [spark-kafka-source-0b4ac39b-98d0-4e94-ae01-f4168bebae2a-1568478783-driver-0,TutorialTopic5,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1558354180045, expireTimestamp=None) ``` Monitoring tools should filter out records from the `__consumer_offsets` topic based on that info. In addition, metadata could be added based on the info coming from [here]( https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L195) so we could pass the query id to the commit call [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L416).
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org