gaborgsomogyi commented on a change in pull request #23791: 
[SPARK-20597][SQL][SS][WIP] KafkaSourceProvider falls back on path as synonym 
for topic
URL: https://github.com/apache/spark/pull/23791#discussion_r260275520
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 ##########
 @@ -160,76 +204,93 @@ class KafkaSinkSuite extends StreamTest with 
SharedSQLContext with KafkaTest {
     }
   }
 
-  test("streaming - write aggregation w/o topic field, with topic option") {
+  /**
+   * The purpose of this test is to ensure that the topic option overrides the
+   * topic path and topic field. We begin by writing some data that includes a
+   * topic field and value (e.g., 'foo') along with a topic option and topic
+   * path (e.g. 'bar'). Then when we read from the topic specified in the 
option
+   * we should see the data i.e., the data was written to the topic option, and
+   * not to the topic in the data e.g., foo
+   */
+  test("streaming - aggregation with topic field, path and topic option") {
+    val input = MemoryStream[String]
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    val writer = createKafkaWriter(
+      input.toDF().groupBy("value").count(),
+      withTopic = Some(topic),
+      withPath = Some(topic),
+      withOutputMode = Some(OutputMode.Update()))(
+      withSelectExpr = "'foo' as topic",
+      "CAST(value as STRING) key",
+      "CAST(count as STRING) value")
+
+    checkStreamAggregation(input, writer, topic)
+  }
+
+  test("streaming - write aggregation with topic and path option") {
     val input = MemoryStream[String]
     val topic = newTopic()
     testUtils.createTopic(topic)
 
     val writer = createKafkaWriter(
       input.toDF().groupBy("value").count(),
       withTopic = Some(topic),
+      withPath = Some(topic),
       withOutputMode = Some(OutputMode.Update()))(
       withSelectExpr = "CAST(value as STRING) key", "CAST(count as STRING) 
value")
 
-    val reader = createKafkaReader(topic)
-      .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
-      .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
-      .as[(Int, Int)]
+    checkStreamAggregation(input, writer, topic)
+  }
 
-    try {
-      input.addData("1", "2", "2", "3", "3", "3")
-      failAfter(streamingTimeout) {
-        writer.processAllAvailable()
-      }
-      checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3))
-      input.addData("1", "2", "3")
-      failAfter(streamingTimeout) {
-        writer.processAllAvailable()
-      }
-      checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), 
(3, 4))
-    } finally {
-      writer.stop()
-    }
+  test("streaming - write aggregation with path option") {
 
 Review comment:
   Some tests are named `streaming - write aggregation...` and others 
`streaming - aggregation...`. Is it intentional?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to