This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8ab2a0c [SPARK-31278][SS] Fix StreamingQuery output rows metric 8ab2a0c is described below commit 8ab2a0c5f23a59c00a9b4191afd976af50d913ba Author: Burak Yavuz <brk...@gmail.com> AuthorDate: Tue Apr 7 17:17:47 2020 -0700 [SPARK-31278][SS] Fix StreamingQuery output rows metric ### What changes were proposed in this pull request? In Structured Streaming, we provide progress updates every 10 seconds when a stream doesn't have any new data upstream. When providing this progress though, we zero out the input information but not the output information. This PR fixes that bug. ### Why are the changes needed? Fixes a bug around incorrect metrics ### Does this PR introduce any user-facing change? Fixes a bug in the metrics ### How was this patch tested? New regression test Closes #28040 from brkyvz/sinkMetrics. Lead-authored-by: Burak Yavuz <brk...@gmail.com> Co-authored-by: Burak Yavuz <bu...@databricks.com> Signed-off-by: Burak Yavuz <brk...@gmail.com> --- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 2 +- .../execution/streaming/MicroBatchExecution.scala | 3 +- .../sql/execution/streaming/ProgressReporter.scala | 32 ++++++---- .../sql/streaming/StreamingAggregationSuite.scala | 71 ++++++++++++++-------- .../streaming/StreamingDeduplicationSuite.scala | 3 +- .../StreamingQueryStatusAndProgressSuite.scala | 1 + 6 files changed, 73 insertions(+), 39 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 5c8c5b1..4e808a5 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -314,7 +314,7 @@ class KafkaSinkMicroBatchStreamingSuite extends KafkaSinkStreamingSuiteBase { try { input.addData("1", "2", "3") verifyResult(writer) { - assert(writer.lastProgress.sink.numOutputRows == 3L) + assert(writer.recentProgress.exists(_.sink.numOutputRows == 3L)) } } finally { writer.stop() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 45a2ce1..e022bfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -226,7 +226,8 @@ class MicroBatchExecution( } } - finishTrigger(currentBatchHasNewData) // Must be outside reportTimeTaken so it is recorded + // Must be outside reportTimeTaken so it is recorded + finishTrigger(currentBatchHasNewData, isCurrentBatchConstructed) // Signal waiting threads. Note this must be after finishTrigger() to ensure all // activities (progress generation, etc.) have completed before signaling. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index feb151a..d1086cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -85,8 +85,8 @@ trait ProgressReporter extends Logging { private val noDataProgressEventInterval = sparkSession.sessionState.conf.streamingNoDataProgressEventInterval - // The timestamp we report an event that has no input data - private var lastNoDataProgressEventTime = Long.MinValue + // The timestamp we report an event that has not executed anything + private var lastNoExecutionProgressEventTime = Long.MinValue private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) @@ -142,8 +142,15 @@ trait ProgressReporter extends Logging { logInfo(s"Streaming query made progress: $newProgress") } - /** Finalizes the query progress and adds it to list of recent status updates. */ - protected def finishTrigger(hasNewData: Boolean): Unit = { + /** + * Finalizes the query progress and adds it to list of recent status updates. + * + * @param hasNewData Whether the sources of this stream had new data for this trigger. + * @param hasExecuted Whether any batch was executed during this trigger. Streaming queries that + * perform stateful aggregations with timeouts can still run batches even + * though the sources don't have any new data. + */ + protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = { assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null) currentTriggerEndTimestamp = triggerClock.getTimeMillis() @@ -170,9 +177,12 @@ trait ProgressReporter extends Logging { ) } - val sinkProgress = SinkProgress( - sink.toString, - sinkCommitProgress.map(_.numOutputRows)) + val sinkOutput = if (hasExecuted) { + sinkCommitProgress.map(_.numOutputRows) + } else { + sinkCommitProgress.map(_ => 0L) + } + val sinkProgress = SinkProgress(sink.toString, sinkOutput) val observedMetrics = extractObservedMetrics(hasNewData, lastExecution) val newProgress = new StreamingQueryProgress( @@ -189,14 +199,14 @@ trait ProgressReporter extends Logging { sink = sinkProgress, observedMetrics = new java.util.HashMap(observedMetrics.asJava)) - if (hasNewData) { + if (hasExecuted) { // Reset noDataEventTimestamp if we processed any data - lastNoDataProgressEventTime = Long.MinValue + lastNoExecutionProgressEventTime = Long.MinValue updateProgress(newProgress) } else { val now = triggerClock.getTimeMillis() - if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { - lastNoDataProgressEventTime = now + if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) { + lastNoExecutionProgressEventTime = now updateProgress(newProgress) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 7413553..85e1b85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -202,47 +202,68 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { } } - def stateOperatorProgresses: Seq[StateOperatorProgress] = { - val operatorProgress = mutable.ArrayBuffer[StateOperatorProgress]() - var progress = query.recentProgress.last - - operatorProgress ++= progress.stateOperators.map { op => op.copy(op.numRowsUpdated) } - if (progress.numInputRows == 0) { - // empty batch, merge metrics from previous batch as well - progress = query.recentProgress.takeRight(2).head - operatorProgress.zipWithIndex.foreach { case (sop, index) => - // "numRowsUpdated" should be merged, as it could be updated in both batches. - // (for now it is only updated from previous batch, but things can be changed.) - // other metrics represent current status of state so picking up the latest values. - val newOperatorProgress = sop.copy( - sop.numRowsUpdated + progress.stateOperators(index).numRowsUpdated) - operatorProgress(index) = newOperatorProgress - } - } + // Pick the latest progress that actually ran a batch + def lastExecutedBatch: StreamingQueryProgress = { + query.recentProgress.filter(_.durationMs.containsKey("addBatch")).last + } - operatorProgress + def stateOperatorProgresses: Seq[StateOperatorProgress] = { + lastExecutedBatch.stateOperators } } + val clock = new StreamManualClock() + testStream(aggWithWatermark)( + // batchId 0 AddData(inputData, 15), - CheckAnswer(), // watermark = 5 + StartStream(Trigger.ProcessingTime("interval 1 second"), clock), + CheckAnswer(), // watermark = 0 AssertOnQuery { _.stateNodes.size === 1 }, AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 }, + AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 }, + + // batchId 1 without data + AdvanceManualClock(1000L), // watermark = 5 + Execute { q => // wait for the no data batch to complete + eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 1) } + }, + CheckAnswer(), + AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 }, + AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 }, + + // batchId 2 with data AddData(inputData, 10, 12, 14), - CheckAnswer(), // watermark = 5 - AssertOnQuery { _.stateNodes.size === 1 }, + AdvanceManualClock(1000L), // watermark = 5 + CheckAnswer(), AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 }, + AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 }, + + // batchId 3 with data AddData(inputData, 25), - CheckAnswer((10, 3)), // watermark = 15 - AssertOnQuery { _.stateNodes.size === 1 }, - AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 }, + AdvanceManualClock(1000L), // watermark = 5 + CheckAnswer(), + AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, - AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 } + AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 }, + AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 }, + + // batchId 4 without data + AdvanceManualClock(1000L), // watermark = 15 + Execute { q => // wait for the no data batch to complete + eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 4) } + }, + CheckAnswer((10, 3)), + AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 }, + AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 1 } ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index cfd7204..f63778a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -280,7 +280,8 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { { // State should have been cleaned if flag is set, otherwise should not have been cleaned if (flag) assertNumStateRows(total = 1, updated = 1) else assertNumStateRows(total = 7, updated = 1) - } + }, + AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 6f00b52..08b3644 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -241,6 +241,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { assert(nextProgress.numInputRows === 0) assert(nextProgress.stateOperators.head.numRowsTotal === 2) assert(nextProgress.stateOperators.head.numRowsUpdated === 0) + assert(nextProgress.sink.numOutputRows === 0) } } finally { query.stop() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org