redsk commented on a change in pull request #26153: [SPARK-29500][SQL][SS] 
Support partition column when writing to Kafka
URL: https://github.com/apache/spark/pull/26153#discussion_r336456840
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 ##########
 @@ -404,20 +420,85 @@ abstract class KafkaSinkBatchSuiteBase extends 
KafkaSinkSuiteBase {
       .save()
     checkAnswer(
       createKafkaReader(topic, includeHeaders = true).selectExpr(
-        "CAST(value as STRING) value", "headers"
+        "CAST(value as STRING) value", "headers", "partition"
       ),
-      Row("1", Seq(Row("a", "b".getBytes(UTF_8)))) ::
-        Row("2", Seq(Row("c", "d".getBytes(UTF_8)), Row("e", 
"f".getBytes(UTF_8)))) ::
-        Row("3", Seq(Row("g", "h".getBytes(UTF_8)), Row("g", 
"i".getBytes(UTF_8)))) ::
-        Row("4", null) ::
+      Row("1", Seq(Row("a", "b".getBytes(UTF_8))), 0) ::
+        Row("2", Seq(Row("c", "d".getBytes(UTF_8)), Row("e", 
"f".getBytes(UTF_8))), 1) ::
+        Row("3", Seq(Row("g", "h".getBytes(UTF_8)), Row("g", 
"i".getBytes(UTF_8))), 2) ::
+        Row("4", null, 3) ::
         Row("5", Seq(
           Row("j", "k".getBytes(UTF_8)),
           Row("j", "l".getBytes(UTF_8)),
-          Row("m", "n".getBytes(UTF_8)))) ::
+          Row("m", "n".getBytes(UTF_8))), 0) ::
         Nil
     )
   }
 
+  test("batch - partition column vs default Kafka partitioner") {
 
 Review comment:
   - "Not sure why 100 partitions necessary?": I just wanted to avoid the test 
to succeed by chance. I admit it's overkill. I'll reduce it to 10.
   - "Don't we need the following tests?". See my other comment.
   - "`.collect().toList.head` maybe not enough because it would be good to 
make sure the data is in one partition." `.collect()` docstring says "Returns 
an array that contains all rows in this Dataset. Running collect requires 
moving all the data into the application's driver process, and doing so on a 
very large dataset can crash the driver process with OutOfMemoryError." Does 
`collect()` work on a per-partition base? If so maybe I could use 
`.coalesce(1)` before calling `collect()`.
   - "I think this test can be formed more simple." Ok it makes sense. I'll 
simplify it.
   - "There are a couple of copy-pastes" Do you refer to the ```df.write
         .format("kafka")
         .option("kafka.bootstrap.servers", testUtils.brokerAddress)
         .option("topic", topic)
         .mode("append")
         .save()``` blocks? In that case, they are also all over the file. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to