HeartSaVioR commented on a change in pull request #33091:
URL: https://github.com/apache/spark/pull/33091#discussion_r660321229



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
##########
@@ -127,12 +132,26 @@ case class FlatMapGroupsWithStateExec(
             iter
         }
 
+        val newDataProcessorIter =
+          CompletionIterator[InternalRow, Iterator[InternalRow]](
+            processor.processNewData(filteredIter), {
+            // Once the input is processed, mark the start time for timeout 
processing to measure
+            // it separately from the overall processing time.
+            timeoutProcessingStartTimeNs = System.nanoTime
+          })
+
+        val timeoutProcessorIter =
+          CompletionIterator[InternalRow, 
Iterator[InternalRow]](processor.processTimedOutState(), {
+            // Note: `timeoutLatencyMs` also includes the time the parent 
operator took for
+            // processing output returned through iterator.
+            timeoutLatencyMs += (System.nanoTime - 
timeoutProcessingStartTimeNs)

Review comment:
       We need to convert to milliseconds here.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
##########
@@ -1122,6 +1106,43 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest {
       spark.createDataset(Seq(("a", 2), ("b", 1))).toDF)
   }
 
+  test("SPARK-35896: metrics in StateOperatorProgress are output correctly") {
+    val inputData = MemoryStream[(String, Int)]
+    val result =
+      inputData.toDS
+        .select($"_1".as("key"), timestamp_seconds($"_2").as("eventTime"))
+        .withWatermark("eventTime", "10 seconds")
+        .as[(String, Long)]
+        .groupByKey(_._1)
+        .flatMapGroupsWithState(Update, EventTimeTimeout)(sampleTestFunction)
+
+    testStream(result, Update)(
+      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"3")),
+
+      AddData(inputData, ("a", 11), ("a", 13), ("a", 15)),
+      // Max event time = 15. Timeout timestamp for "a" = 15 + 5 = 20. 
Watermark = 15 - 10 = 5.
+      CheckNewAnswer(("a", 15)),  // Output = max event time of a
+      assertNumStateRows(
+        total = Seq(1), updated = Seq(1), droppedByWatermark = Seq(0), removed 
= Some(Seq(0))),
+
+      AddData(inputData, ("a", 4)),       // Add data older than watermark for 
"a"
+      CheckNewAnswer(),                   // No output as data should get 
filtered by watermark
+      assertStateOperatorProgressMetric(

Review comment:
       The number of `numStateStores` is actually confusing. We have two 
different views of the number; someone may expect the right number as 1.
   
   We may need to explain that the number represents "every" state store 
instances initialized in all tasks, otherwise people who think like above may 
think this as a bug.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
##########
@@ -58,9 +59,17 @@ trait StateStoreMetricsTest extends StreamTest {
 
         val numRowsDroppedByWatermark = 
arraySum(allNumRowsDroppedByWatermarkSinceLastCheck,
           numStateOperators)
+

Review comment:
       nit: let's leave it as it was

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
##########
@@ -737,7 +761,7 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest {
       StartStream(),
       AddData(inputData, "a", "b"), // should remove state for "a" and not 
return anything for a
       CheckNewAnswer(("b", "2")),
-      assertNumStateRows(total = 1, updated = 2),
+      assertNumStateRows(total = 1, updated = 1),

Review comment:
       Could we please retain checking the 1 here? I guess it's related to 
"removal". Below changes as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to