Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183533297
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
    @@ -207,62 +209,92 @@ trait ProgressReporter extends Logging {
           return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
         }
     
    -    // We want to associate execution plan leaves to sources that generate 
them, so that we match
    -    // the their metrics (e.g. numOutputRows) to the sources. To do this 
we do the following.
    -    // Consider the translation from the streaming logical plan to the 
final executed plan.
    -    //
    -    //  streaming logical plan (with sources) <==> trigger's logical plan 
<==> executed plan
    -    //
    -    // 1. We keep track of streaming sources associated with each leaf in 
the trigger's logical plan
    -    //    - Each logical plan leaf will be associated with a single 
streaming source.
    -    //    - There can be multiple logical plan leaves associated with a 
streaming source.
    -    //    - There can be leaves not associated with any streaming source, 
because they were
    -    //      generated from a batch source (e.g. stream-batch joins)
    -    //
    -    // 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
    -    //    the trigger logical plan, we associate executed plan leaves with 
corresponding
    -    //    streaming sources.
    -    //
    -    // 3. For each source, we sum the metrics of the associated execution 
plan leaves.
    -    //
    -    val logicalPlanLeafToSource = newData.flatMap { case (source, 
logicalPlan) =>
    -      logicalPlan.collectLeaves().map { leaf => leaf -> source }
    +    val numInputRows = extractSourceToNumInputRows()
    +
    +    val eventTimeStats = lastExecution.executedPlan.collect {
    +      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
    +        val stats = e.eventTimeStats.value
    +        Map(
    +          "max" -> stats.max,
    +          "min" -> stats.min,
    +          "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
    +    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
    +
    +    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
    +  }
    +
    +  /** Extract number of input sources for each streaming source in plan */
    +  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, 
Long] = {
    +
    +    def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): 
Map[BaseStreamingSource, Long] = {
    +      tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for 
each source
         }
    -    val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // 
includes non-streaming
    -    val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
    -    val numInputRows: Map[BaseStreamingSource, Long] =
    +
    +    val onlyDataSourceV2Sources = {
    +      // Check whether the streaming query's logical plan has only V2 data 
sources
    +      val allStreamingLeaves =
    +        logicalPlan.collect { case s: StreamingExecutionRelation => s }
    +      allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
    --- End diff --
    
    we don't have a way to track these for ContinuousProcessing at the moment?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to