Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22598#discussion_r223354146
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
    @@ -556,29 +549,61 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
           this
         }
     
    +    def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
    +      // There are multiple possibilities to log in:
    +      // - Token is provided -> try to log in with scram module using 
kafka's dynamic JAAS
    +      //   configuration.
    +      // - Token not provided -> try to log in with JVM global security 
configuration
    +      //   which can be configured for example with 
'java.security.auth.login.config'.
    +      //   For this no additional parameter needed.
    +      KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) match {
    +        case Some(jaasParams) =>
    +          logInfo("Delegation token detected, using it for login.")
    +          val mechanism = kafkaParams
    +            .getOrElse(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.DEFAULT_SASL_MECHANISM)
    +          require(mechanism.startsWith("SCRAM"),
    +            "Delegation token works only with SCRAM mechanism.")
    +          set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
    +        case None => // No params required
    +          logInfo("Delegation token not found.")
    +      }
    +      this
    +    }
    +
         def build(): ju.Map[String, Object] = map
       }
     
       private[kafka010] def kafkaParamsForProducer(
    -      parameters: Map[String, String]): Map[String, String] = {
    +      parameters: Map[String, String]): ju.Map[String, Object] = {
         val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
         if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}"))
 {
           throw new IllegalArgumentException(
             s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is 
not supported as keys "
               + "are serialized with ByteArraySerializer.")
         }
     
    -    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
    -    {
    +    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
 {
           throw new IllegalArgumentException(
             s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' 
is not supported as "
               + "value are serialized with ByteArraySerializer.")
         }
    +
    +    val specifiedKafkaParams = convertToSpecifiedParams(parameters)
    +
    +    val configUpdater = ConfigUpdater("executor", specifiedKafkaParams)
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to