Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/22282#discussion_r214640616
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
---
@@ -59,14 +59,23 @@ class KafkaSinkSuite extends StreamTest with
SharedSQLContext with KafkaTest {
val topic = newTopic()
testUtils.createTopic(topic)
val df = Seq("1", "2", "3", "4", "5").map(v => (topic,
v)).toDF("topic", "value")
+ .withColumn("headers",
+ map(lit("x"),
col("value").plus(1).cast(IntegerType).cast(StringType).cast(BinaryType),
+ lit("y"),
col("value").multiply(2).cast(IntegerType).cast(StringType).cast(BinaryType)))
df.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", topic)
.save()
checkAnswer(
- createKafkaReader(topic).selectExpr("CAST(value as STRING) value"),
- Row("1") :: Row("2") :: Row("3") :: Row("4") :: Row("5") :: Nil)
+ createKafkaReader(topic).selectExpr(
+ "CAST(value as STRING) value",
+ "CAST(headers.x AS STRING)",
+ "CAST(headers.y AS STRING)"
+ ),
+ Row("1", "2", "2") :: Row("2", "3", "4") :: Row("3", "4", "6") ::
Row("4", "5", "8"
--- End diff --
nit: `) ::` could be added here instead of next line.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]