redsk commented on a change in pull request #26153: [SPARK-29500][SQL][SS] 
Support partition column when writing to Kafka
URL: https://github.com/apache/spark/pull/26153#discussion_r336456840
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 ##########
 @@ -404,20 +420,85 @@ abstract class KafkaSinkBatchSuiteBase extends 
KafkaSinkSuiteBase {
       .save()
     checkAnswer(
       createKafkaReader(topic, includeHeaders = true).selectExpr(
-        "CAST(value as STRING) value", "headers"
+        "CAST(value as STRING) value", "headers", "partition"
       ),
-      Row("1", Seq(Row("a", "b".getBytes(UTF_8)))) ::
-        Row("2", Seq(Row("c", "d".getBytes(UTF_8)), Row("e", 
"f".getBytes(UTF_8)))) ::
-        Row("3", Seq(Row("g", "h".getBytes(UTF_8)), Row("g", 
"i".getBytes(UTF_8)))) ::
-        Row("4", null) ::
+      Row("1", Seq(Row("a", "b".getBytes(UTF_8))), 0) ::
+        Row("2", Seq(Row("c", "d".getBytes(UTF_8)), Row("e", 
"f".getBytes(UTF_8))), 1) ::
+        Row("3", Seq(Row("g", "h".getBytes(UTF_8)), Row("g", 
"i".getBytes(UTF_8))), 2) ::
+        Row("4", null, 3) ::
         Row("5", Seq(
           Row("j", "k".getBytes(UTF_8)),
           Row("j", "l".getBytes(UTF_8)),
-          Row("m", "n".getBytes(UTF_8)))) ::
+          Row("m", "n".getBytes(UTF_8))), 0) ::
         Nil
     )
   }
 
+  test("batch - partition column vs default Kafka partitioner") {
 
 Review comment:
   > Not sure why 100 partitions necessary?
    I just wanted to avoid the test to succeed by chance. I admit it's 
overkill. I'll reduce it to 10.
   > Don't we need the following tests?
   See my other comment.
   >`.collect().toList.head` maybe not enough because it would be good to make 
sure the data is in one partition.
   `.collect()` docstring says "Returns an array that contains all rows in this 
Dataset. Running collect requires moving all the data into the application's 
driver process, and doing so on a very large dataset can crash the driver 
process with OutOfMemoryError." Does `collect()` work on a per-partition base? 
If so maybe I could use `.coalesce(1)` before calling `collect()`.
   > I think this test can be formed more simple.
   Ok it makes sense. I'll simplify it.
   > There are a couple of copy-pastes
   Do you refer to the ```df.write
         .format("kafka")
         .option("kafka.bootstrap.servers", testUtils.brokerAddress)
         .option("topic", topic)
         .mode("append")
         .save()``` blocks? In that case, they are also all over the file. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to