This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2221d3e [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run 2221d3e is described below commit 2221d3e0183140a0a98f6de92f84d2d924aab703 Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> AuthorDate: Wed Apr 8 16:59:39 2020 -0700 [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run ### What changes were proposed in this pull request? This patch fixes the behavior of ProgressReporter which always overwrite the value of "updated" of state operator to 0 if there's no new data. The behavior is correct only when we copy the state progress from "previous" executed plan, meaning no batch has been run. (Nonzero value of "updated" would be odd if batch didn't run, so it was correct.) It was safe to assume no data is no batch, but SPARK-24156 enables empty data can run the batch if Spark needs to deal with watermark. After the patch, it only overwrites the value if both two conditions are met: 1) no data 2) no batch. ### Why are the changes needed? Currently Spark doesn't reflect correct metrics when empty batch is run and this patch fixes it. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Modified UT. Note that FlatMapGroupsWithState increases the value of "updated" when state rows are removed. Also manually tested via below query (not a simple query to test with spark-shell, as you'll meet closure issue in spark-shell while playing with state func): > query ``` case class RunningCount(count: Long) object TestFlatMapGroupsWithState { def main(args: Array[String]): Unit = { import org.apache.spark.sql.SparkSession val ss = SparkSession .builder() .appName("TestFlatMapGroupsWithState") .getOrCreate() ss.conf.set("spark.sql.shuffle.partitions", "5") import ss.implicits._ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { if (state.hasTimedOut) { // End users are not restricted to remove the state here - they can update the // state as well. For example, event time session window would have list of // sessions here and it cannot remove entire state. state.update(RunningCount(-1)) Iterator((key, "-1")) } else { val count = state.getOption.map(_.count).getOrElse(0L) + values.size state.update(RunningCount(count)) state.setTimeoutDuration("1 seconds") Iterator((key, count.toString)) } } implicit val sqlContext = ss.sqlContext val inputData = MemoryStream[String] val result = inputData .toDF() .as[String] .groupByKey { v => v } .flatMapGroupsWithState(OutputMode.Append(), GroupStateTimeout.ProcessingTimeTimeout())(stateFunc) val query = result .writeStream .format("memory") .option("queryName", "test") .outputMode("append") .trigger(Trigger.ProcessingTime("5 second")) .start() Thread.sleep(1000) var chIdx: Long = 0 while (true) { (chIdx to chIdx + 4).map { idx => inputData.addData(idx.toString) } chIdx += 5 // intentionally sleep much more than trigger to enable "empty" batch Thread.sleep(10 * 1000) } } } ``` > before the patch (batch 3 which was an "empty" batch) ``` { "id":"de945a5c-882b-4dae-aa58-cb8261cbaf9e", "runId":"f1eb6d0d-3cd5-48b2-a03b-5e989b6c151b", "name":"test", "timestamp":"2019-11-18T07:00:25.005Z", "batchId":3, "numInputRows":0, "inputRowsPerSecond":0.0, "processedRowsPerSecond":0.0, "durationMs":{ "addBatch":1664, "getBatch":0, "latestOffset":0, "queryPlanning":29, "triggerExecution":1789, "walCommit":51 }, "stateOperators":[ { "numRowsTotal":10, "numRowsUpdated":0, "memoryUsedBytes":5130, "customMetrics":{ "loadedMapCacheHitCount":15, "loadedMapCacheMissCount":0, "stateOnCurrentVersionSizeBytes":2722 } } ], "sources":[ { "description":"MemoryStream[value#1]", "startOffset":9, "endOffset":9, "numInputRows":0, "inputRowsPerSecond":0.0, "processedRowsPerSecond":0.0 } ], "sink":{ "description":"MemorySink", "numOutputRows":5 } } ``` > after the patch (batch 3 which was an "empty" batch) ``` { "id":"7cb41623-6b9a-408e-ae02-6796ec636fa0", "runId":"17847710-ddfe-45f5-a7ab-b160e149382f", "name":"test", "timestamp":"2019-11-18T07:02:25.005Z", "batchId":3, "numInputRows":0, "inputRowsPerSecond":0.0, "processedRowsPerSecond":0.0, "durationMs":{ "addBatch":1196, "getBatch":0, "latestOffset":0, "queryPlanning":30, "triggerExecution":1333, "walCommit":46 }, "stateOperators":[ { "numRowsTotal":10, "numRowsUpdated":5, "memoryUsedBytes":5130, "customMetrics":{ "loadedMapCacheHitCount":15, "loadedMapCacheMissCount":0, "stateOnCurrentVersionSizeBytes":2722 } } ], "sources":[ { "description":"MemoryStream[value#1]", "startOffset":9, "endOffset":9, "numInputRows":0, "inputRowsPerSecond":0.0, "processedRowsPerSecond":0.0 } ], "sink":{ "description":"MemorySink", "numOutputRows":5 } } ``` "numRowsUpdated" is `0` in "stateOperators" before applying the patch which is "wrong", as we "update" the state when timeout occurs. After applying the patch, it correctly represents the "numRowsUpdated" as `5` in "stateOperators". Closes #25987 from HeartSaVioR/SPARK-29314. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> Signed-off-by: Burak Yavuz <brk...@gmail.com> (cherry picked from commit ca2ba4fe647cd60668410b68014a3991ad7fd5c9) Signed-off-by: Burak Yavuz <brk...@gmail.com> --- .../spark/sql/execution/streaming/ProgressReporter.scala | 12 ++++++------ .../spark/sql/streaming/FlatMapGroupsWithStateSuite.scala | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index d1086cd..0dff1c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -154,7 +154,7 @@ trait ProgressReporter extends Logging { assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null) currentTriggerEndTimestamp = triggerClock.getTimeMillis() - val executionStats = extractExecutionStats(hasNewData) + val executionStats = extractExecutionStats(hasNewData, hasExecuted) val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND @@ -215,26 +215,26 @@ trait ProgressReporter extends Logging { } /** Extract statistics about stateful operators from the executed query plan. */ - private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = { + private def extractStateOperatorMetrics(hasExecuted: Boolean): Seq[StateOperatorProgress] = { if (lastExecution == null) return Nil - // lastExecution could belong to one of the previous triggers if `!hasNewData`. + // lastExecution could belong to one of the previous triggers if `!hasExecuted`. // Walking the plan again should be inexpensive. lastExecution.executedPlan.collect { case p if p.isInstanceOf[StateStoreWriter] => val progress = p.asInstanceOf[StateStoreWriter].getProgress() - if (hasNewData) progress else progress.copy(newNumRowsUpdated = 0) + if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0) } } /** Extracts statistics from the most recent query execution. */ - private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { + private def extractExecutionStats(hasNewData: Boolean, hasExecuted: Boolean): ExecutionStats = { val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty val watermarkTimestamp = if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) else Map.empty[String, String] // SPARK-19378: Still report metrics even though no data was processed while reporting progress. - val stateOperators = extractStateOperatorMetrics(hasNewData) + val stateOperators = extractStateOperatorMetrics(hasExecuted) if (!hasNewData) { return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index d36c64f..b04f8b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -798,7 +798,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { } }, CheckNewAnswer(("c", "-1")), - assertNumStateRows(total = 0, updated = 0) + assertNumStateRows(total = 0, updated = 1) ) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org