HeartSaVioR commented on code in PR #48862:
URL: https://github.com/apache/spark/pull/48862#discussion_r1853308147
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -708,6 +732,52 @@ class TransformWithStateSuite extends StateStoreMetricsTest
)
}
+ test("transformWithState - timer duration should be reflected in metrics") {
+ val clock = new StreamManualClock
+ val inputData = MemoryStream[String]
+ val result = inputData.toDS()
+ .groupByKey(x => x)
+ .transformWithState(
+ new SleepingTimerProcessor, TimeMode.ProcessingTime(),
OutputMode.Update())
+
+ testStream(result, OutputMode.Update())(
+ StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+ AddData(inputData, "a"),
+ AdvanceManualClock(1 * 1000),
+ // Side effect: timer scheduled for t = 1 + 10 = 11.
+ CheckNewAnswer(),
+ Execute { q =>
+ val metrics = q.lastProgress.stateOperators(0).customMetrics
+ assert(metrics.get("numRegisteredTimers") === 1)
+ assert(metrics.get("timerProcessingTimeMs") < 2000)
+ },
+
+ AddData(inputData, "b"),
+ AdvanceManualClock(1 * 1000),
+ // Side effect: timer scheduled for t = 2 + 10 = 12.
+ CheckNewAnswer(),
+ Execute { q =>
+ val metrics = q.lastProgress.stateOperators(0).customMetrics
+ assert(metrics.get("numRegisteredTimers") === 1)
Review Comment:
nit: it's not total but registered timers in this batch, right? It's
"slightly" confusing, but isn't a scope of this PR to revisit.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -387,13 +401,12 @@ case class TransformWithStateExec(
// Return an iterator of all the rows generated by all the keys, such that
when fully
// consumed, all the state updates will be committed by the state store
CompletionIterator[InternalRow, Iterator[InternalRow]](outputIterator, {
- // Note: Due to the iterator lazy execution, this metric also captures
the time taken
- // by the upstream (consumer) operators in addition to the processing in
this operator.
- allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime -
updatesStartTimeNs)
+ allRemovalsTimeMs += timeTakenMs {
+ processorHandle.doTtlCleanup()
+ }
+
commitTimeMs += timeTakenMs {
if (isStreaming) {
- // clean up any expired user state
Review Comment:
Nice finding!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]