gaborgsomogyi commented on a change in pull request #29729:
URL: https://github.com/apache/spark/pull/29729#discussion_r499568889
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -46,39 +49,40 @@ import org.apache.spark.util.{UninterruptibleThread,
UninterruptibleThreadRunner
private[kafka010] class KafkaOffsetReader(
consumerStrategy: ConsumerStrategy,
val driverKafkaParams: ju.Map[String, Object],
- readerOptions: CaseInsensitiveMap[String],
- driverGroupIdPrefix: String) extends Logging {
+ readerOptions: CaseInsensitiveMap[String]) extends Logging {
/**
- * [[UninterruptibleThreadRunner]] ensures that all [[KafkaConsumer]]
communication called in an
+ * [[UninterruptibleThreadRunner]] ensures that all Kafka communication
called in an
* [[UninterruptibleThread]]. In the case of streaming queries, we are
already running in an
* [[UninterruptibleThread]], however for batch mode this is not the case.
*/
val uninterruptibleThreadRunner = new UninterruptibleThreadRunner("Kafka
Offset Reader")
- /**
- * Place [[groupId]] and [[nextId]] here so that they are initialized before
any consumer is
- * created -- see SPARK-19564.
- */
- private var groupId: String = null
- private var nextId = 0
-
- /**
- * A KafkaConsumer used in the driver to query the latest Kafka offsets.
This only queries the
- * offsets and never commits them.
- */
- @volatile protected var _consumer: Consumer[Array[Byte], Array[Byte]] = null
+ @volatile protected var _admin: Admin = null
- protected def consumer: Consumer[Array[Byte], Array[Byte]] = synchronized {
+ protected def admin: Admin = synchronized {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
- if (_consumer == null) {
- val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
- if (driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
- newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
Review comment:
There were another discussions touching this area but it's scattered so
making a summary here. In driver the `group.id` is not used anymore but in the
executors we still do. That's the reason why the configuration possibility is
still there. The end user impact is that `group.id` authorization is not
working anymore (as written a good alternative is topic based authorization).
It's breaking.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]