This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0da463e [SPARK-35880][SS] Track the duplicates dropped count in dedupe operator 0da463e is described below commit 0da463e59304954515f003f98574c740b47b89fb Author: Venki Korukanti <venki.koruka...@gmail.com> AuthorDate: Mon Jun 28 13:21:00 2021 +0900 [SPARK-35880][SS] Track the duplicates dropped count in dedupe operator ### What changes were proposed in this pull request? Add a metric to track the number of duplicates dropped in input in streaming deduplication operator. Also introduce a `StatefulOperatorCustomMetric` to allow stateful operators to output their own unique metrics in `StateOperatorProgress.customMetrics` in `StreamingQueryProgress`. ### Why are the changes needed? 1. Having the duplicates dropped count help monitor and debug any incorrect results issue or find reasons for state size increases in dedupe operator. 2. New API `StatefulOperatorCustomMetric` allows stateful operators to expose their own unique metrics in `StateOperatorProgress.customMetrics` in `StreamingQueryProgress` ### Does this PR introduce _any_ user-facing change? Yes. For deduplication stateful operator a new metric `numDuplicatesDropped` is shown in `StateOperatorProgress` within `StreamingQueryProgress`. Example `StreamingQueryProgress` output in JSON form. ``` { "id" : "510be3cd-a955-4faf-8456-d97c78d39af5", "runId" : "c170c4cd-04cb-4a28-b054-74020e3998e1", ... , "stateOperators" : [ { "numRowsTotal" : 1, "numRowsUpdated" : 1, "numRowsDroppedByWatermark" : 0, "customMetrics" : { "loadedMapCacheHitCount" : 0, "loadedMapCacheMissCount" : 0, "numDuplicatesDropped" : 0, "stateOnCurrentVersionSizeBytes" : 392 } }], ... } ``` ### How was this patch tested? Existing UTs for regression and added a UT. Closes #33065 from vkorukanti/SPARK-35880. Authored-by: Venki Korukanti <venki.koruka...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../execution/streaming/statefulOperators.scala | 43 ++++++++++++- .../sql/streaming/StateStoreMetricsTest.scala | 70 +++++++++++++++++----- .../streaming/StreamingDeduplicationSuite.scala | 49 +++++++++++++++ 3 files changed, 143 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index f0527c1..41dcfde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit._ import scala.collection.JavaConverters._ +import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -66,6 +67,24 @@ trait StatefulOperator extends SparkPlan { } } +/** + * Custom stateful operator metric definition to allow operators to expose their own custom metrics. + * Also provides [[SQLMetric]] instance to show the metric in UI and accumulate it at the query + * level. + */ +trait StatefulOperatorCustomMetric { + def name: String + def desc: String + def createSQLMetric(sparkContext: SparkContext): SQLMetric +} + +/** Custom stateful operator metric for simple "count" gauge */ +case class StatefulOperatorCustomSumMetric(name: String, desc: String) + extends StatefulOperatorCustomMetric { + override def createSQLMetric(sparkContext: SparkContext): SQLMetric = + SQLMetrics.createMetric(sparkContext, desc) +} + /** An operator that reads from a StateStore. */ trait StateStoreReader extends StatefulOperator { override lazy val metrics = Map( @@ -75,7 +94,7 @@ trait StateStoreReader extends StatefulOperator { /** An operator that writes to a StateStore. */ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => - override lazy val metrics = Map( + override lazy val metrics = statefulOperatorCustomMetrics ++ Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext, "number of rows which are dropped by watermark"), @@ -92,7 +111,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => * the driver after this SparkPlan has been executed and metrics have been updated. */ def getProgress(): StateOperatorProgress = { - val customMetrics = stateStoreCustomMetrics + val customMetrics = (stateStoreCustomMetrics ++ statefulOperatorCustomMetrics) .map(entry => entry._1 -> longMetric(entry._1).value) val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] = @@ -130,6 +149,19 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => }.toMap } + /** + * Set of stateful operator custom metrics. These are captured as part of the generic + * key-value map [[StateOperatorProgress.customMetrics]]. + * Stateful operators can extend this method to provide their own unique custom metrics. + */ + protected def customStatefulOperatorMetrics: Seq[StatefulOperatorCustomMetric] = Nil + + private def statefulOperatorCustomMetrics: Map[String, SQLMetric] = { + customStatefulOperatorMetrics.map { + metric => (metric.name, metric.createSQLMetric(sparkContext)) + }.toMap + } + protected def applyRemovingRowsOlderThanWatermark( iter: Iterator[InternalRow], predicateDropRowByWatermark: BasePredicate): Iterator[InternalRow] = { @@ -468,11 +500,11 @@ case class StreamingDeduplicateExec( Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "false")) { (store, iter) => val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) val numOutputRows = longMetric("numOutputRows") - val numTotalStateRows = longMetric("numTotalStateRows") val numUpdatedStateRows = longMetric("numUpdatedStateRows") val allUpdatesTimeMs = longMetric("allUpdatesTimeMs") val allRemovalsTimeMs = longMetric("allRemovalsTimeMs") val commitTimeMs = longMetric("commitTimeMs") + val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows") val baseIterator = watermarkPredicateForData match { case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate) @@ -492,6 +524,7 @@ case class StreamingDeduplicateExec( true } else { // Drop duplicated rows + numDroppedDuplicateRows += 1 false } } @@ -509,6 +542,10 @@ case class StreamingDeduplicateExec( override def outputPartitioning: Partitioning = child.outputPartitioning + override def customStatefulOperatorMetrics: Seq[StatefulOperatorCustomMetric] = { + Seq(StatefulOperatorCustomSumMetric("numDroppedDuplicateRows", "number of duplicates dropped")) + } + override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = { eventTimeWatermark.isDefined && newMetadata.batchWatermarkMs > eventTimeWatermark.get } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala index be83f0e..5073723 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala @@ -32,25 +32,14 @@ trait StateStoreMetricsTest extends StreamTest { def assertNumStateRows( total: Seq[Long], updated: Seq[Long], - droppedByWatermark: Seq[Long]): AssertOnQuery = + droppedByWatermark: Seq[Long]): AssertOnQuery = { AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated" + s", rows dropped by watermark = $droppedByWatermark") { q => // This assumes that the streaming query will not make any progress while the eventually // is being executed. eventually(timeout(streamingTimeout)) { - val recentProgress = q.recentProgress - require(recentProgress.nonEmpty, "No progress made, cannot check num state rows") - require(recentProgress.length < spark.sessionState.conf.streamingProgressRetention, - "This test assumes that all progresses are present in q.recentProgress but " + - "some may have been dropped due to retention limits") - - if (q.ne(lastQuery)) lastCheckedRecentProgressIndex = -1 - lastQuery = q - - val numStateOperators = recentProgress.last.stateOperators.length - val progressesSinceLastCheck = recentProgress - .slice(lastCheckedRecentProgressIndex + 1, recentProgress.length) - .filter(_.stateOperators.length == numStateOperators) + val (progressesSinceLastCheck, lastCheckedProgressIndex, numStateOperators) = + retrieveProgressesSinceLastCheck(q) val allNumUpdatedRowsSinceLastCheck = progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated)) @@ -61,7 +50,7 @@ trait StateStoreMetricsTest extends StreamTest { lazy val debugString = "recent progresses:\n" + progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n") - val numTotalRows = recentProgress.last.stateOperators.map(_.numRowsTotal) + val numTotalRows = progressesSinceLastCheck.last.stateOperators.map(_.numRowsTotal) assert(numTotalRows === total, s"incorrect total rows, $debugString") val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators) @@ -72,10 +61,36 @@ trait StateStoreMetricsTest extends StreamTest { assert(numRowsDroppedByWatermark === droppedByWatermark, s"incorrect dropped rows by watermark, $debugString") - lastCheckedRecentProgressIndex = recentProgress.length - 1 + advanceLastCheckedRecentProgressIndex(lastCheckedProgressIndex) + } + true + } + } + + /** AssertOnQuery to verify the given state operator's custom metric has expected value */ + def assertStateOperatorCustomMetric( + metric: String, expected: Long, operatorIndex: Int = 0): AssertOnQuery = { + AssertOnQuery(s"Check metrics $metric has value $expected") { q => + eventually(timeout(streamingTimeout)) { + val (progressesSinceLastCheck, lastCheckedProgressIndex, numStateOperators) = + retrieveProgressesSinceLastCheck(q) + assert(operatorIndex < numStateOperators, s"Invalid operator Index: $operatorIndex") + + val allCustomMetricValuesSinceLastCheck = progressesSinceLastCheck + .map(_.stateOperators(operatorIndex).customMetrics.get(metric)) + .map(Long2long) + + lazy val debugString = "recent progresses:\n" + + progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n") + + assert(allCustomMetricValuesSinceLastCheck.sum === expected, + s"incorrect custom metric ($metric), $debugString") + + advanceLastCheckedRecentProgressIndex(lastCheckedProgressIndex) } true } + } def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery = { assert(total.length === updated.length) @@ -96,4 +111,27 @@ trait StateStoreMetricsTest extends StreamTest { "Arrays are of different lengths:\n" + arraySeq.map(_.toSeq).mkString("\n")) (0 until arrayLength).map { index => arraySeq.map(_.apply(index)).sum } } + + def retrieveProgressesSinceLastCheck( + execution: StreamExecution): (Array[StreamingQueryProgress], Int, Int) = { + val recentProgress = execution.recentProgress + require(recentProgress != null, "No progress made") + require(recentProgress.length < spark.sessionState.conf.streamingProgressRetention, + "This test assumes that all progresses are present in q.recentProgress but " + + "some may have been dropped due to retention limits") + + if (execution.ne(lastQuery)) lastCheckedRecentProgressIndex = -1 + lastQuery = execution + + val numStateOperators = recentProgress.last.stateOperators.length + val recentProgresses = recentProgress + .slice(lastCheckedRecentProgressIndex + 1, recentProgress.length) + .filter(_.stateOperators.length == numStateOperators) + + (recentProgresses, recentProgress.length - 1, recentProgresses.last.stateOperators.length) + } + + def advanceLastCheckedRecentProgressIndex(newLastCheckedRecentProgressIndex: Int): Unit = { + lastCheckedRecentProgressIndex = newLastCheckedRecentProgressIndex + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index ac9cd1a..dc2e787 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -332,4 +332,53 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { } } + test("SPARK-35880: custom metric numDroppedDuplicateRows in state operator progress") { + val dedupeInputData = MemoryStream[(String, Int)] + val dedupe = dedupeInputData.toDS().dropDuplicates("_1") + + testStream(dedupe, Append)( + AddData(dedupeInputData, "a" -> 1), + CheckLastBatch("a" -> 1), + assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 0), + + AddData(dedupeInputData, "a" -> 2, "b" -> 3), + CheckLastBatch("b" -> 3), + assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1), + + AddData(dedupeInputData, "a" -> 5, "b" -> 2, "c" -> 9), + CheckLastBatch("c" -> 9), + assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 2) + ) + + // with watermark + val dedupeWithWMInputData = MemoryStream[Int] + val dedupeWithWatermark = dedupeWithWMInputData.toDS() + .withColumn("eventTime", timestamp_seconds($"value")) + .withWatermark("eventTime", "10 seconds") + .dropDuplicates() + .select($"eventTime".cast("long").as[Long]) + + testStream(dedupeWithWatermark, Append)( + AddData(dedupeWithWMInputData, (1 to 5).flatMap(_ => (10 to 15)): _*), + CheckAnswer(10 to 15: _*), + assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 24), + + AddData(dedupeWithWMInputData, 14), + CheckNewAnswer(), + assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1), + + // Advance watermark to 15 secs, no-data-batch drops rows <= 15 + AddData(dedupeWithWMInputData, 25), + CheckNewAnswer(25), + assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 0), + + AddData(dedupeWithWMInputData, 10), // Should not emit anything as data less than watermark + CheckNewAnswer(), + assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 0), + + AddData(dedupeWithWMInputData, 26, 26), + CheckNewAnswer(26), + assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1) + ) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org