gaborgsomogyi commented on a change in pull request #29729:
URL: https://github.com/apache/spark/pull/29729#discussion_r500117329
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -46,39 +49,40 @@ import org.apache.spark.util.{UninterruptibleThread,
UninterruptibleThreadRunner
private[kafka010] class KafkaOffsetReader(
consumerStrategy: ConsumerStrategy,
val driverKafkaParams: ju.Map[String, Object],
- readerOptions: CaseInsensitiveMap[String],
- driverGroupIdPrefix: String) extends Logging {
+ readerOptions: CaseInsensitiveMap[String]) extends Logging {
/**
- * [[UninterruptibleThreadRunner]] ensures that all [[KafkaConsumer]]
communication called in an
+ * [[UninterruptibleThreadRunner]] ensures that all Kafka communication
called in an
* [[UninterruptibleThread]]. In the case of streaming queries, we are
already running in an
* [[UninterruptibleThread]], however for batch mode this is not the case.
*/
val uninterruptibleThreadRunner = new UninterruptibleThreadRunner("Kafka
Offset Reader")
- /**
- * Place [[groupId]] and [[nextId]] here so that they are initialized before
any consumer is
- * created -- see SPARK-19564.
- */
- private var groupId: String = null
- private var nextId = 0
-
- /**
- * A KafkaConsumer used in the driver to query the latest Kafka offsets.
This only queries the
- * offsets and never commits them.
- */
Review comment:
OK, added the comment but mentioned `KafkaConsumer` why offset commit is
important.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]