skonto commented on a change in pull request #24613: [SPARK-27549][SS] Add
support for committing kafka offsets per batch for supporting external tooling
URL: https://github.com/apache/spark/pull/24613#discussion_r284849396
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -110,6 +110,34 @@ private[kafka010] class KafkaOffsetReader(
kafkaReaderThread.shutdown()
}
+ /**
+ * Commits offsets to the consumer_offsets topic
+ * for the driver consumer.
+ */
+ def commitOffsetsAsync(
+ partitionToOffsets: Map[TopicPartition, Long],
+ s: KafkaCommitsSource)
+ : Unit = {
+ if (_consumer != null) runUninterruptibly {
Review comment:
I just re-use the existing variable as in `close` method. Basically the
`consume` method will try to initialize it and here we assume there is a
consumer already, if not something wrong happened we dont want to initialize it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]