Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20096#discussion_r159731324
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
    @@ -181,26 +223,22 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
         }
       }
     
    -  private def kafkaParamsForProducer(parameters: Map[String, String]): 
Map[String, String] = {
    -    val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
    -    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}"))
 {
    -      throw new IllegalArgumentException(
    -        s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is 
not supported as keys "
    -          + "are serialized with ByteArraySerializer.")
    -    }
    +  override def createContinuousWriter(
    +      queryId: String,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceV2Options): java.util.Optional[ContinuousWriter] 
= {
    +    import scala.collection.JavaConverters._
     
    -    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
    -    {
    -      throw new IllegalArgumentException(
    -        s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' 
is not supported as "
    -          + "value are serialized with ByteArraySerializer.")
    -    }
    -    parameters
    -      .keySet
    -      .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
    -      .map { k => k.drop(6).toString -> parameters(k) }
    -      .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName,
    -        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName)
    +    val spark = SparkSession.getActiveSession.get
    +    val topic = 
Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
    +    // We convert the options argument from V2 -> Java map -> scala 
mutable -> scala immutable.
    +    val producerParams = 
kafkaParamsForProducer(options.asMap.asScala.toMap)
    +
    +    KafkaWriter.validateQuery(
    +      schema.toAttributes, new java.util.HashMap[String, 
Object](producerParams.asJava), topic)
    +
    +    java.util.Optional.of(new ContinuousKafkaWriter(topic, producerParams, 
schema))
    --- End diff --
    
    import java.util.Optional. Its being used in multiple places in this file.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to