Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214640616
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 ---
    @@ -59,14 +59,23 @@ class KafkaSinkSuite extends StreamTest with 
SharedSQLContext with KafkaTest {
         val topic = newTopic()
         testUtils.createTopic(topic)
         val df = Seq("1", "2", "3", "4", "5").map(v => (topic, 
v)).toDF("topic", "value")
    +      .withColumn("headers",
    +        map(lit("x"), 
col("value").plus(1).cast(IntegerType).cast(StringType).cast(BinaryType),
    +          lit("y"), 
col("value").multiply(2).cast(IntegerType).cast(StringType).cast(BinaryType)))
         df.write
           .format("kafka")
           .option("kafka.bootstrap.servers", testUtils.brokerAddress)
           .option("topic", topic)
           .save()
         checkAnswer(
    -      createKafkaReader(topic).selectExpr("CAST(value as STRING) value"),
    -      Row("1") :: Row("2") :: Row("3") :: Row("4") :: Row("5") :: Nil)
    +      createKafkaReader(topic).selectExpr(
    +        "CAST(value as STRING) value",
    +        "CAST(headers.x AS STRING)",
    +        "CAST(headers.y AS STRING)"
    +      ),
    +      Row("1", "2", "2") :: Row("2", "3", "4") :: Row("3", "4", "6") :: 
Row("4", "5", "8"
    --- End diff --
    
    nit: `) ::` could be added here instead of next line.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to