This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0da463e  [SPARK-35880][SS] Track the duplicates dropped count in 
dedupe operator
0da463e is described below

commit 0da463e59304954515f003f98574c740b47b89fb
Author: Venki Korukanti <venki.koruka...@gmail.com>
AuthorDate: Mon Jun 28 13:21:00 2021 +0900

    [SPARK-35880][SS] Track the duplicates dropped count in dedupe operator
    
    ### What changes were proposed in this pull request?
    
    Add a metric to track the number of duplicates dropped in input in 
streaming deduplication operator. Also introduce a 
`StatefulOperatorCustomMetric` to allow stateful operators to output their own 
unique metrics in `StateOperatorProgress.customMetrics` in 
`StreamingQueryProgress`.
    
    ### Why are the changes needed?
    
    1. Having the duplicates dropped count help monitor and debug any incorrect 
results issue or find reasons for state size increases in dedupe operator.
    2. New API `StatefulOperatorCustomMetric` allows stateful operators to 
expose their own unique metrics in `StateOperatorProgress.customMetrics` in 
`StreamingQueryProgress`
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. For deduplication stateful operator a new metric 
`numDuplicatesDropped` is shown in `StateOperatorProgress` within 
`StreamingQueryProgress`. Example `StreamingQueryProgress` output in JSON form.
    
    ```
    {
      "id" : "510be3cd-a955-4faf-8456-d97c78d39af5",
      "runId" : "c170c4cd-04cb-4a28-b054-74020e3998e1",
      ...
      ,
      "stateOperators" : [ {
        "numRowsTotal" : 1,
        "numRowsUpdated" : 1,
        "numRowsDroppedByWatermark" : 0,
        "customMetrics" : {
          "loadedMapCacheHitCount" : 0,
          "loadedMapCacheMissCount" : 0,
          "numDuplicatesDropped" : 0,
          "stateOnCurrentVersionSizeBytes" : 392
        }
      }],
      ...
    }
    ```
    
    ### How was this patch tested?
    
    Existing UTs for regression and added a UT.
    
    Closes #33065 from vkorukanti/SPARK-35880.
    
    Authored-by: Venki Korukanti <venki.koruka...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../execution/streaming/statefulOperators.scala    | 43 ++++++++++++-
 .../sql/streaming/StateStoreMetricsTest.scala      | 70 +++++++++++++++++-----
 .../streaming/StreamingDeduplicationSuite.scala    | 49 +++++++++++++++
 3 files changed, 143 insertions(+), 19 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index f0527c1..41dcfde 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit._
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -66,6 +67,24 @@ trait StatefulOperator extends SparkPlan {
   }
 }
 
+/**
+ * Custom stateful operator metric definition to allow operators to expose 
their own custom metrics.
+ * Also provides [[SQLMetric]] instance to show the metric in UI and 
accumulate it at the query
+ * level.
+ */
+trait StatefulOperatorCustomMetric {
+  def name: String
+  def desc: String
+  def createSQLMetric(sparkContext: SparkContext): SQLMetric
+}
+
+/** Custom stateful operator metric for simple "count" gauge */
+case class StatefulOperatorCustomSumMetric(name: String, desc: String)
+  extends StatefulOperatorCustomMetric {
+  override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
+    SQLMetrics.createMetric(sparkContext, desc)
+}
+
 /** An operator that reads from a StateStore. */
 trait StateStoreReader extends StatefulOperator {
   override lazy val metrics = Map(
@@ -75,7 +94,7 @@ trait StateStoreReader extends StatefulOperator {
 /** An operator that writes to a StateStore. */
 trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
 
-  override lazy val metrics = Map(
+  override lazy val metrics = statefulOperatorCustomMetrics ++ Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
     "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext,
       "number of rows which are dropped by watermark"),
@@ -92,7 +111,7 @@ trait StateStoreWriter extends StatefulOperator { self: 
SparkPlan =>
    * the driver after this SparkPlan has been executed and metrics have been 
updated.
    */
   def getProgress(): StateOperatorProgress = {
-    val customMetrics = stateStoreCustomMetrics
+    val customMetrics = (stateStoreCustomMetrics ++ 
statefulOperatorCustomMetrics)
       .map(entry => entry._1 -> longMetric(entry._1).value)
 
     val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] =
@@ -130,6 +149,19 @@ trait StateStoreWriter extends StatefulOperator { self: 
SparkPlan =>
     }.toMap
   }
 
+  /**
+   * Set of stateful operator custom metrics. These are captured as part of 
the generic
+   * key-value map [[StateOperatorProgress.customMetrics]].
+   * Stateful operators can extend this method to provide their own unique 
custom metrics.
+   */
+  protected def customStatefulOperatorMetrics: 
Seq[StatefulOperatorCustomMetric] = Nil
+
+  private def statefulOperatorCustomMetrics: Map[String, SQLMetric] = {
+    customStatefulOperatorMetrics.map {
+      metric => (metric.name, metric.createSQLMetric(sparkContext))
+    }.toMap
+  }
+
   protected def applyRemovingRowsOlderThanWatermark(
       iter: Iterator[InternalRow],
       predicateDropRowByWatermark: BasePredicate): Iterator[InternalRow] = {
@@ -468,11 +500,11 @@ case class StreamingDeduplicateExec(
       Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "false")) { 
(store, iter) =>
       val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
       val numOutputRows = longMetric("numOutputRows")
-      val numTotalStateRows = longMetric("numTotalStateRows")
       val numUpdatedStateRows = longMetric("numUpdatedStateRows")
       val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
       val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
       val commitTimeMs = longMetric("commitTimeMs")
+      val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows")
 
       val baseIterator = watermarkPredicateForData match {
         case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, 
predicate)
@@ -492,6 +524,7 @@ case class StreamingDeduplicateExec(
           true
         } else {
           // Drop duplicated rows
+          numDroppedDuplicateRows += 1
           false
         }
       }
@@ -509,6 +542,10 @@ case class StreamingDeduplicateExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  override def customStatefulOperatorMetrics: 
Seq[StatefulOperatorCustomMetric] = {
+    Seq(StatefulOperatorCustomSumMetric("numDroppedDuplicateRows", "number of 
duplicates dropped"))
+  }
+
   override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean 
= {
     eventTimeWatermark.isDefined && newMetadata.batchWatermarkMs > 
eventTimeWatermark.get
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
index be83f0e..5073723 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
@@ -32,25 +32,14 @@ trait StateStoreMetricsTest extends StreamTest {
   def assertNumStateRows(
       total: Seq[Long],
       updated: Seq[Long],
-      droppedByWatermark: Seq[Long]): AssertOnQuery =
+      droppedByWatermark: Seq[Long]): AssertOnQuery = {
     AssertOnQuery(s"Check total state rows = $total, updated state rows = 
$updated" +
       s", rows dropped by watermark = $droppedByWatermark") { q =>
       // This assumes that the streaming query will not make any progress 
while the eventually
       // is being executed.
       eventually(timeout(streamingTimeout)) {
-        val recentProgress = q.recentProgress
-        require(recentProgress.nonEmpty, "No progress made, cannot check num 
state rows")
-        require(recentProgress.length < 
spark.sessionState.conf.streamingProgressRetention,
-          "This test assumes that all progresses are present in 
q.recentProgress but " +
-            "some may have been dropped due to retention limits")
-
-        if (q.ne(lastQuery)) lastCheckedRecentProgressIndex = -1
-        lastQuery = q
-
-        val numStateOperators = recentProgress.last.stateOperators.length
-        val progressesSinceLastCheck = recentProgress
-          .slice(lastCheckedRecentProgressIndex + 1, recentProgress.length)
-          .filter(_.stateOperators.length == numStateOperators)
+        val (progressesSinceLastCheck, lastCheckedProgressIndex, 
numStateOperators) =
+          retrieveProgressesSinceLastCheck(q)
 
         val allNumUpdatedRowsSinceLastCheck =
           progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated))
@@ -61,7 +50,7 @@ trait StateStoreMetricsTest extends StreamTest {
         lazy val debugString = "recent progresses:\n" +
           progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
 
-        val numTotalRows = 
recentProgress.last.stateOperators.map(_.numRowsTotal)
+        val numTotalRows = 
progressesSinceLastCheck.last.stateOperators.map(_.numRowsTotal)
         assert(numTotalRows === total, s"incorrect total rows, $debugString")
 
         val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, 
numStateOperators)
@@ -72,10 +61,36 @@ trait StateStoreMetricsTest extends StreamTest {
         assert(numRowsDroppedByWatermark === droppedByWatermark,
           s"incorrect dropped rows by watermark, $debugString")
 
-        lastCheckedRecentProgressIndex = recentProgress.length - 1
+        advanceLastCheckedRecentProgressIndex(lastCheckedProgressIndex)
+      }
+      true
+    }
+  }
+
+  /** AssertOnQuery to verify the given state operator's custom metric has 
expected value */
+  def assertStateOperatorCustomMetric(
+      metric: String, expected: Long, operatorIndex: Int = 0): AssertOnQuery = 
{
+    AssertOnQuery(s"Check metrics $metric has value $expected") { q =>
+      eventually(timeout(streamingTimeout)) {
+        val (progressesSinceLastCheck, lastCheckedProgressIndex, 
numStateOperators) =
+          retrieveProgressesSinceLastCheck(q)
+        assert(operatorIndex < numStateOperators, s"Invalid operator Index: 
$operatorIndex")
+
+        val allCustomMetricValuesSinceLastCheck = progressesSinceLastCheck
+          .map(_.stateOperators(operatorIndex).customMetrics.get(metric))
+          .map(Long2long)
+
+        lazy val debugString = "recent progresses:\n" +
+          progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
+
+        assert(allCustomMetricValuesSinceLastCheck.sum === expected,
+          s"incorrect custom metric ($metric), $debugString")
+
+        advanceLastCheckedRecentProgressIndex(lastCheckedProgressIndex)
       }
       true
     }
+  }
 
   def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery 
= {
     assert(total.length === updated.length)
@@ -96,4 +111,27 @@ trait StateStoreMetricsTest extends StreamTest {
       "Arrays are of different lengths:\n" + 
arraySeq.map(_.toSeq).mkString("\n"))
     (0 until arrayLength).map { index => arraySeq.map(_.apply(index)).sum }
   }
+
+  def retrieveProgressesSinceLastCheck(
+      execution: StreamExecution): (Array[StreamingQueryProgress], Int, Int) = 
{
+    val recentProgress = execution.recentProgress
+    require(recentProgress != null, "No progress made")
+    require(recentProgress.length < 
spark.sessionState.conf.streamingProgressRetention,
+      "This test assumes that all progresses are present in q.recentProgress 
but " +
+        "some may have been dropped due to retention limits")
+
+    if (execution.ne(lastQuery)) lastCheckedRecentProgressIndex = -1
+    lastQuery = execution
+
+    val numStateOperators = recentProgress.last.stateOperators.length
+    val recentProgresses = recentProgress
+      .slice(lastCheckedRecentProgressIndex + 1, recentProgress.length)
+      .filter(_.stateOperators.length == numStateOperators)
+
+    (recentProgresses, recentProgress.length - 1, 
recentProgresses.last.stateOperators.length)
+  }
+
+  def advanceLastCheckedRecentProgressIndex(newLastCheckedRecentProgressIndex: 
Int): Unit = {
+    lastCheckedRecentProgressIndex = newLastCheckedRecentProgressIndex
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index ac9cd1a..dc2e787 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -332,4 +332,53 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
     }
   }
 
+  test("SPARK-35880: custom metric numDroppedDuplicateRows in state operator 
progress") {
+    val dedupeInputData = MemoryStream[(String, Int)]
+    val dedupe = dedupeInputData.toDS().dropDuplicates("_1")
+
+    testStream(dedupe, Append)(
+      AddData(dedupeInputData, "a" -> 1),
+      CheckLastBatch("a" -> 1),
+      assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 0),
+
+      AddData(dedupeInputData, "a" -> 2, "b" -> 3),
+      CheckLastBatch("b" -> 3),
+      assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1),
+
+      AddData(dedupeInputData, "a" -> 5, "b" -> 2, "c" -> 9),
+      CheckLastBatch("c" -> 9),
+      assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 2)
+    )
+
+    // with watermark
+    val dedupeWithWMInputData = MemoryStream[Int]
+    val dedupeWithWatermark = dedupeWithWMInputData.toDS()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates()
+      .select($"eventTime".cast("long").as[Long])
+
+    testStream(dedupeWithWatermark, Append)(
+      AddData(dedupeWithWMInputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+      CheckAnswer(10 to 15: _*),
+      assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 
24),
+
+      AddData(dedupeWithWMInputData, 14),
+      CheckNewAnswer(),
+      assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1),
+
+      // Advance watermark to 15 secs, no-data-batch drops rows <= 15
+      AddData(dedupeWithWMInputData, 25),
+      CheckNewAnswer(25),
+      assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 0),
+
+      AddData(dedupeWithWMInputData, 10), // Should not emit anything as data 
less than watermark
+      CheckNewAnswer(),
+      assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 0),
+
+      AddData(dedupeWithWMInputData, 26, 26),
+      CheckNewAnswer(26),
+      assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1)
+    )
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to