Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/22598#discussion_r223354146
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
---
@@ -556,29 +549,61 @@ private[kafka010] object KafkaSourceProvider extends
Logging {
this
}
+ def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
+ // There are multiple possibilities to log in:
+ // - Token is provided -> try to log in with scram module using
kafka's dynamic JAAS
+ // configuration.
+ // - Token not provided -> try to log in with JVM global security
configuration
+ // which can be configured for example with
'java.security.auth.login.config'.
+ // For this no additional parameter needed.
+ KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) match {
+ case Some(jaasParams) =>
+ logInfo("Delegation token detected, using it for login.")
+ val mechanism = kafkaParams
+ .getOrElse(SaslConfigs.SASL_MECHANISM,
SaslConfigs.DEFAULT_SASL_MECHANISM)
+ require(mechanism.startsWith("SCRAM"),
+ "Delegation token works only with SCRAM mechanism.")
+ set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+ case None => // No params required
+ logInfo("Delegation token not found.")
+ }
+ this
+ }
+
def build(): ju.Map[String, Object] = map
}
private[kafka010] def kafkaParamsForProducer(
- parameters: Map[String, String]): Map[String, String] = {
+ parameters: Map[String, String]): ju.Map[String, Object] = {
val caseInsensitiveParams = parameters.map { case (k, v) =>
(k.toLowerCase(Locale.ROOT), v) }
if
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}"))
{
throw new IllegalArgumentException(
s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is
not supported as keys "
+ "are serialized with ByteArraySerializer.")
}
- if
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
- {
+ if
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
{
throw new IllegalArgumentException(
s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}'
is not supported as "
+ "value are serialized with ByteArraySerializer.")
}
+
+ val specifiedKafkaParams = convertToSpecifiedParams(parameters)
+
+ val configUpdater = ConfigUpdater("executor", specifiedKafkaParams)
--- End diff --
Fixed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]