Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/22598#discussion_r223354097
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
---
@@ -556,29 +549,61 @@ private[kafka010] object KafkaSourceProvider extends
Logging {
this
}
+ def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
+ // There are multiple possibilities to log in:
+ // - Token is provided -> try to log in with scram module using
kafka's dynamic JAAS
+ // configuration.
+ // - Token not provided -> try to log in with JVM global security
configuration
+ // which can be configured for example with
'java.security.auth.login.config'.
+ // For this no additional parameter needed.
+ KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) match {
+ case Some(jaasParams) =>
+ logInfo("Delegation token detected, using it for login.")
+ val mechanism = kafkaParams
+ .getOrElse(SaslConfigs.SASL_MECHANISM,
SaslConfigs.DEFAULT_SASL_MECHANISM)
+ require(mechanism.startsWith("SCRAM"),
+ "Delegation token works only with SCRAM mechanism.")
+ set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+ case None => // No params required
+ logInfo("Delegation token not found.")
--- End diff --
Fixed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]