tashoyan opened a new pull request #28623:
URL: https://github.com/apache/spark/pull/28623


   ### What changes were proposed in this pull request?
   
   Fix for [SPARK-31805](https://issues.apache.org/jira/browse/SPARK-31805): do 
not set the `group.id` consumer property when using the "assign" strategy.
   
   ### Why are the changes needed?
   
   With secure Kafka blocker an application fails, because the auto-generated 
group id is not allowed by the broker:
   ```text
   org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized 
to access group: 
spark-kafka-relation-ecab045d-4ee6-425e-88a0-495d4100a013-driver-0
   ```
   
   For the "assign" strategy, consumer group is not used. Therefore the best 
fix is to exclude the `group.id` property from the consumer config.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. When using "assign" strategy:
   1. No need to reconfigure the broker - to add the necessary group ids to ACL
   1. (since Spark 3.0.0) No need to provide a custom group id 
([SPARK-26350](https://issues.apache.org/jira/browse/SPARK-26350)) or a custom 
prefix ([SPARK-26121](https://issues.apache.org/jira/browse/SPARK-26121))
   
   
   ### How was this patch tested?
   
   Manually:
   1. Rebuild the module:
       ```bash
       mvn install -pl :spark-sql-kafka-0-10_2.12
       ```
   1. Rebuild my application with newly built `spark-sql-kafka-0-10_2.12`. My 
application does the following:
       ```scala
       val kafkaDf = spark.read
         .format("kafka")
         // "kafka.bootstrap.servers" and SASL-specific options
         .options(options)
         .option("assign", topicPartitionsJson)
         .option("startingOffsets", startingOffsetsJson)
         .option("endingOffsets", "latest")
         .load()
       ```
   1. Run my application with a secure Kafka broker and verify that it does not 
fail with "GroupAuthorizationException: ..." anymore.
   
   I did not manage to add unit tests, because I do not know, how to set up a 
secure Kafka broker. Unit tests in spark-sql-kafka-0-10 use the tool 
`KafkaTestUtils`. I am not sure if this tool is suitable to simulate a secure 
Kafka broker with ACLs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to