Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/22598#discussion_r222083143
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
---
@@ -560,25 +553,56 @@ private[kafka010] object KafkaSourceProvider extends
Logging {
}
private[kafka010] def kafkaParamsForProducer(
- parameters: Map[String, String]): Map[String, String] = {
+ parameters: Map[String, String]): ju.Map[String, Object] = {
val caseInsensitiveParams = parameters.map { case (k, v) =>
(k.toLowerCase(Locale.ROOT), v) }
if
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}"))
{
throw new IllegalArgumentException(
s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is
not supported as keys "
+ "are serialized with ByteArraySerializer.")
}
- if
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
- {
+ if
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
{
throw new IllegalArgumentException(
s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}'
is not supported as "
+ "value are serialized with ByteArraySerializer.")
}
+
+ val specifiedKafkaParams = convertToSpecifiedParams(parameters)
+
+ val configUpdater = ConfigUpdater("executor", specifiedKafkaParams)
+ .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
+ .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
+
+ setTokenJaasConfig(specifiedKafkaParams, configUpdater)
+
+ configUpdater.build()
+ }
+
+ private def convertToSpecifiedParams(parameters: Map[String, String]):
Map[String, String] = {
parameters
.keySet
.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
- .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
classOf[ByteArraySerializer].getName,
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
classOf[ByteArraySerializer].getName)
+ .toMap
+ }
+
+ private def setTokenJaasConfig(
--- End diff --
Moved.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]