Github user dongjinleekr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214856258
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
    @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
           throw new NullPointerException(s"null topic present in the data. Use 
the " +
             s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a 
default topic.")
         }
    -    val record = new ProducerRecord[Array[Byte], 
Array[Byte]](topic.toString, key, value)
    +    val record = if (projectedRow.isNullAt(3)) {
    +      new ProducerRecord[Array[Byte], Array[Byte]](
    +        topic.toString,
    +        null,
    +        key,
    +        value
    +      )
    +    } else {
    +      val headerMap = projectedRow.getMap(3)
    +      val headers = (0 until headerMap.numElements()).toArray.map(
    +        i =>
    +          new RecordHeader(
    --- End diff --
    
    As of September 2018, `RecordHeader` is the only implementation provided by 
Kafka. As you can see 
[here](https://memorynotfound.com/spring-kafka-adding-custom-header-kafka-message-example/),
 this way is widely used - I think it is more natural for `RecordHeader` to be 
hidden by some builder classes but it's not. It seems a missing spot.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to