HeartSaVioR commented on a change in pull request #26201: [SPARK-29543][SS][UI] 
Init structured streaming ui
URL: https://github.com/apache/spark/pull/26201#discussion_r348352731
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ##########
 @@ -1102,6 +1102,84 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     }
   }
 
+  test("stream query summary with mixed batch and streaming V1 sources") {
+    val streamingTriggerDF = spark.createDataset(1 to 10).toDF
+    val streamingInputDF = 
createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")
+    val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
+
+    // Trigger input has 10 rows, static input has 2 rows,
+    // therefore after the first trigger, the calculated input rows should be 
10
+    val querySummary = getQuerySummary(streamingInputDF.join(staticInputDF, 
"value"))
+    assert(querySummary.getMetric(QuerySummary.TOTAL_INPUT_RECORDS, -1) === 10)
+  }
+
+  test("stream query summary with trigger having data for both of two V2 
sources") {
+    val streamInput1 = MemoryStream[Int]
+    val streamInput2 = MemoryStream[Int]
+
+    testStream(streamInput1.toDF().union(streamInput2.toDF()))(
+      AddData(streamInput1, 1, 2, 3),
+      AddData(streamInput2, 4, 5, 6, 7),
+      CheckLastBatch(1, 2, 3, 4, 5, 6, 7),
+      AssertOnQuery { q =>
+        val querySummary = q.getQuerySummary
+        assert(querySummary.getMetric(QuerySummary.TOTAL_INPUT_RECORDS, -1L) 
== 7L)
+        true
+      },
+      AddData(streamInput2, 8, 9),
+      CheckLastBatch(8, 9),
+      AssertOnQuery { q =>
+        val querySummary = q.getQuerySummary
+        assert(querySummary.getMetric(QuerySummary.TOTAL_INPUT_RECORDS, -1L) 
== 9L)
+        true
+      }
+    )
+  }
+
+  test("stream query summary with trigger having data for only one of two V2 
sources") {
+    val streamInput1 = MemoryStream[Int]
+    val streamInput2 = MemoryStream[Int]
+
+    testStream(streamInput1.toDF().union(streamInput2.toDF()))(
+      AddData(streamInput1, 1, 2, 3),
+      CheckLastBatch(1, 2, 3),
+      AssertOnQuery { q =>
+        val querySummary = q.getQuerySummary
+        assert(querySummary.getMetric(QuerySummary.TOTAL_INPUT_RECORDS, -1L) 
== 3L)
+        true
+      },
+      AddData(streamInput2, 4, 5),
+      CheckLastBatch(4, 5),
+      AssertOnQuery { q =>
+        val querySummary = q.getQuerySummary
+        assert(querySummary.getMetric(QuerySummary.TOTAL_INPUT_RECORDS, -1L) 
== 5L)
+        true
+      }
+    )
+  }
+
+  test("stream query summary with mixed batch and streaming V2 sources") {
+
 
 Review comment:
   nit: remove unnecessary empty line.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to