Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20096#discussion_r159731324
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
---
@@ -181,26 +223,22 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
}
}
- private def kafkaParamsForProducer(parameters: Map[String, String]):
Map[String, String] = {
- val caseInsensitiveParams = parameters.map { case (k, v) =>
(k.toLowerCase(Locale.ROOT), v) }
- if
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}"))
{
- throw new IllegalArgumentException(
- s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is
not supported as keys "
- + "are serialized with ByteArraySerializer.")
- }
+ override def createContinuousWriter(
+ queryId: String,
+ schema: StructType,
+ mode: OutputMode,
+ options: DataSourceV2Options): java.util.Optional[ContinuousWriter]
= {
+ import scala.collection.JavaConverters._
- if
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
- {
- throw new IllegalArgumentException(
- s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}'
is not supported as "
- + "value are serialized with ByteArraySerializer.")
- }
- parameters
- .keySet
- .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
- .map { k => k.drop(6).toString -> parameters(k) }
- .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
classOf[ByteArraySerializer].getName,
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
classOf[ByteArraySerializer].getName)
+ val spark = SparkSession.getActiveSession.get
+ val topic =
Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
+ // We convert the options argument from V2 -> Java map -> scala
mutable -> scala immutable.
+ val producerParams =
kafkaParamsForProducer(options.asMap.asScala.toMap)
+
+ KafkaWriter.validateQuery(
+ schema.toAttributes, new java.util.HashMap[String,
Object](producerParams.asJava), topic)
+
+ java.util.Optional.of(new ContinuousKafkaWriter(topic, producerParams,
schema))
--- End diff --
import java.util.Optional. Its being used in multiple places in this file.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]