Github user dongjinleekr commented on a diff in the pull request:
https://github.com/apache/spark/pull/22282#discussion_r214856258
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
---
@@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
throw new NullPointerException(s"null topic present in the data. Use
the " +
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a
default topic.")
}
- val record = new ProducerRecord[Array[Byte],
Array[Byte]](topic.toString, key, value)
+ val record = if (projectedRow.isNullAt(3)) {
+ new ProducerRecord[Array[Byte], Array[Byte]](
+ topic.toString,
+ null,
+ key,
+ value
+ )
+ } else {
+ val headerMap = projectedRow.getMap(3)
+ val headers = (0 until headerMap.numElements()).toArray.map(
+ i =>
+ new RecordHeader(
--- End diff --
As of September 2018, `RecordHeader` is the only implementation provided by
Kafka. As you can see
[here](https://memorynotfound.com/spring-kafka-adding-custom-header-kafka-message-example/),
this way is widely used - I think it is more natural for `RecordHeader` to be
hidden by some builder classes but it's not. It seems a missing spot.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]