Github user dongjinleekr commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214856258 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter( throw new NullPointerException(s"null topic present in the data. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } - val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) + val record = if (projectedRow.isNullAt(3)) { + new ProducerRecord[Array[Byte], Array[Byte]]( + topic.toString, + null, + key, + value + ) + } else { + val headerMap = projectedRow.getMap(3) + val headers = (0 until headerMap.numElements()).toArray.map( + i => + new RecordHeader( --- End diff -- As of September 2018, `RecordHeader` is the only implementation provided by Kafka. As you can see [here](https://memorynotfound.com/spring-kafka-adding-custom-header-kafka-message-example/), this way is widely used - I think it is more natural for `RecordHeader` to be hidden by some builder classes but it's not. It seems a missing spot.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org