[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21126 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183912999 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -207,62 +209,126 @@ trait ProgressReporter extends Logging { return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) } -// We want to associate execution plan leaves to sources that generate them, so that we match -// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. -// Consider the translation from the streaming logical plan to the final executed plan. -// -// streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan -// -// 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan -//- Each logical plan leaf will be associated with a single streaming source. -//- There can be multiple logical plan leaves associated with a streaming source. -//- There can be leaves not associated with any streaming source, because they were -// generated from a batch source (e.g. stream-batch joins) -// -// 2. Assuming that the executed plan has same number of leaves in the same order as that of -//the trigger logical plan, we associate executed plan leaves with corresponding -//streaming sources. -// -// 3. For each source, we sum the metrics of the associated execution plan leaves. -// -val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) => - logicalPlan.collectLeaves().map { leaf => leaf -> source } +val numInputRows = extractSourceToNumInputRows() + +val eventTimeStats = lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => +val stats = e.eventTimeStats.value +Map( + "max" -> stats.max, + "min" -> stats.min, + "avg" -> stats.avg.toLong).mapValues(formatTimestamp) +}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp + +ExecutionStats(numInputRows, stateOperators, eventTimeStats) + } + + /** Extract number of input sources for each streaming source in plan */ + private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = { + +import java.util.IdentityHashMap +import scala.collection.JavaConverters._ + +def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = { + tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source } -val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming -val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() -val numInputRows: Map[BaseStreamingSource, Long] = + +val onlyDataSourceV2Sources = { + // Check whether the streaming query's logical plan has only V2 data sources + val allStreamingLeaves = +logicalPlan.collect { case s: StreamingExecutionRelation => s } + allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] } +} + +if (onlyDataSourceV2Sources) { + // DataSourceV2ScanExec is the execution plan leaf that is responsible for reading data + // from a V2 source and has a direct reference to the V2 source that generated it. Each + // DataSourceV2ScanExec records the number of rows it has read using SQLMetrics. However, + // just collecting all DataSourceV2ScanExec nodes and getting the metric is not correct as + // a DataSourceV2ScanExec instance may be referred to in the execution plan from two (or + // even multiple times) points and considering it twice will leads to double counting. We + // can't dedup them using their hashcode either because two different instances of + // DataSourceV2ScanExec can have the same hashcode but account for separate sets of + // records read, and deduping them to consider only one of them would be undercounting the + // records read. Therefore the right way to do this is to consider the unique instances of + // DataSourceV2ScanExec (using their identity hash codes) and get metrics from them. + // Hence we calculate in the following way. + // + // 1. Collect all the unique DataSourceV2ScanExec instances using IdentityHashMap. + // + // 2. Extract the source and the number of rows read from the DataSourceV2ScanExec instanes. + // + // 3. Multiple DataSourceV2ScanExec instance may refer to the same source (can happen with +
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183897545 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -207,62 +209,126 @@ trait ProgressReporter extends Logging { return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) } -// We want to associate execution plan leaves to sources that generate them, so that we match -// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. -// Consider the translation from the streaming logical plan to the final executed plan. -// -// streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan -// -// 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan -//- Each logical plan leaf will be associated with a single streaming source. -//- There can be multiple logical plan leaves associated with a streaming source. -//- There can be leaves not associated with any streaming source, because they were -// generated from a batch source (e.g. stream-batch joins) -// -// 2. Assuming that the executed plan has same number of leaves in the same order as that of -//the trigger logical plan, we associate executed plan leaves with corresponding -//streaming sources. -// -// 3. For each source, we sum the metrics of the associated execution plan leaves. -// -val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) => - logicalPlan.collectLeaves().map { leaf => leaf -> source } +val numInputRows = extractSourceToNumInputRows() + +val eventTimeStats = lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => +val stats = e.eventTimeStats.value +Map( + "max" -> stats.max, + "min" -> stats.min, + "avg" -> stats.avg.toLong).mapValues(formatTimestamp) +}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp + --- End diff -- This above code stayed the same. The diff is pretty dumb. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183660795 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -207,62 +209,92 @@ trait ProgressReporter extends Logging { return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) } -// We want to associate execution plan leaves to sources that generate them, so that we match -// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. -// Consider the translation from the streaming logical plan to the final executed plan. -// -// streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan -// -// 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan -//- Each logical plan leaf will be associated with a single streaming source. -//- There can be multiple logical plan leaves associated with a streaming source. -//- There can be leaves not associated with any streaming source, because they were -// generated from a batch source (e.g. stream-batch joins) -// -// 2. Assuming that the executed plan has same number of leaves in the same order as that of -//the trigger logical plan, we associate executed plan leaves with corresponding -//streaming sources. -// -// 3. For each source, we sum the metrics of the associated execution plan leaves. -// -val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) => - logicalPlan.collectLeaves().map { leaf => leaf -> source } +val numInputRows = extractSourceToNumInputRows() + +val eventTimeStats = lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => +val stats = e.eventTimeStats.value +Map( + "max" -> stats.max, + "min" -> stats.min, + "avg" -> stats.avg.toLong).mapValues(formatTimestamp) +}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp + +ExecutionStats(numInputRows, stateOperators, eventTimeStats) + } + + /** Extract number of input sources for each streaming source in plan */ + private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = { + +def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = { + tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source } -val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming -val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() -val numInputRows: Map[BaseStreamingSource, Long] = + +val onlyDataSourceV2Sources = { + // Check whether the streaming query's logical plan has only V2 data sources + val allStreamingLeaves = +logicalPlan.collect { case s: StreamingExecutionRelation => s } + allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] } --- End diff -- Yeah. This code path is not used by continuous processing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183660623 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.sources(0).numInputRows === 10) } + + test("input row calculation with trigger having data for one of two V2 sources") { +val streamInput1 = MemoryStream[Int] +val streamInput2 = MemoryStream[Int] + +testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)( + AddData(streamInput1, 1, 2, 3), + CheckAnswer(1, 2, 3), + AssertOnQuery { q => +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 2) +assert(lastProgress.get.sources(0).numInputRows == 3) +assert(lastProgress.get.sources(1).numInputRows == 0) +true + } +) + } + + test("input row calculation with mixed batch and streaming V2 sources") { + +val streamInput = MemoryStream[Int] +val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue") + +testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)( + AddData(streamInput, 1, 2, 3), + AssertOnQuery { q => +q.processAllAvailable() + +// The number of leaves in the trigger's logical plan should be same as the executed plan. +require( + q.lastExecution.logical.collectLeaves().length == +q.lastExecution.executedPlan.collectLeaves().length) + +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 1) +assert(lastProgress.get.sources(0).numInputRows == 3) +true + } +) + +val streamInput2 = MemoryStream[Int] +val staticInputDF2 = staticInputDF.union(staticInputDF).cache() + +testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)( --- End diff -- Turns one things were broken for self-joins and self-union. updated the logic and added tests for those for v2 sources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183605577 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.sources(0).numInputRows === 10) } + + test("input row calculation with trigger having data for one of two V2 sources") { +val streamInput1 = MemoryStream[Int] +val streamInput2 = MemoryStream[Int] + +testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)( + AddData(streamInput1, 1, 2, 3), + CheckAnswer(1, 2, 3), + AssertOnQuery { q => +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 2) +assert(lastProgress.get.sources(0).numInputRows == 3) +assert(lastProgress.get.sources(1).numInputRows == 0) +true + } --- End diff -- nit: i'd suggest doing an AddData() for the other stream after, to make sure there's not some weird order dependence --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183604136 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -207,62 +209,92 @@ trait ProgressReporter extends Logging { return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) } -// We want to associate execution plan leaves to sources that generate them, so that we match -// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. -// Consider the translation from the streaming logical plan to the final executed plan. -// -// streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan -// -// 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan -//- Each logical plan leaf will be associated with a single streaming source. -//- There can be multiple logical plan leaves associated with a streaming source. -//- There can be leaves not associated with any streaming source, because they were -// generated from a batch source (e.g. stream-batch joins) -// -// 2. Assuming that the executed plan has same number of leaves in the same order as that of -//the trigger logical plan, we associate executed plan leaves with corresponding -//streaming sources. -// -// 3. For each source, we sum the metrics of the associated execution plan leaves. -// -val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) => - logicalPlan.collectLeaves().map { leaf => leaf -> source } +val numInputRows = extractSourceToNumInputRows() + +val eventTimeStats = lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => +val stats = e.eventTimeStats.value +Map( + "max" -> stats.max, + "min" -> stats.min, + "avg" -> stats.avg.toLong).mapValues(formatTimestamp) +}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp + +ExecutionStats(numInputRows, stateOperators, eventTimeStats) + } + + /** Extract number of input sources for each streaming source in plan */ + private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = { + +def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = { + tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source } -val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming -val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() -val numInputRows: Map[BaseStreamingSource, Long] = + +val onlyDataSourceV2Sources = { + // Check whether the streaming query's logical plan has only V2 data sources + val allStreamingLeaves = +logicalPlan.collect { case s: StreamingExecutionRelation => s } + allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] } --- End diff -- A point fix here won't be sufficient - right now the metrics don't make it to the driver at all in continuous processing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183559926 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -207,62 +209,92 @@ trait ProgressReporter extends Logging { return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) } -// We want to associate execution plan leaves to sources that generate them, so that we match -// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. -// Consider the translation from the streaming logical plan to the final executed plan. -// -// streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan -// -// 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan -//- Each logical plan leaf will be associated with a single streaming source. -//- There can be multiple logical plan leaves associated with a streaming source. -//- There can be leaves not associated with any streaming source, because they were -// generated from a batch source (e.g. stream-batch joins) -// -// 2. Assuming that the executed plan has same number of leaves in the same order as that of -//the trigger logical plan, we associate executed plan leaves with corresponding -//streaming sources. -// -// 3. For each source, we sum the metrics of the associated execution plan leaves. -// -val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) => - logicalPlan.collectLeaves().map { leaf => leaf -> source } +val numInputRows = extractSourceToNumInputRows() + +val eventTimeStats = lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => +val stats = e.eventTimeStats.value +Map( + "max" -> stats.max, + "min" -> stats.min, + "avg" -> stats.avg.toLong).mapValues(formatTimestamp) +}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp + +ExecutionStats(numInputRows, stateOperators, eventTimeStats) + } + + /** Extract number of input sources for each streaming source in plan */ + private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = { + +def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = { + tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source } -val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming -val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() -val numInputRows: Map[BaseStreamingSource, Long] = + +val onlyDataSourceV2Sources = { + // Check whether the streaming query's logical plan has only V2 data sources + val allStreamingLeaves = +logicalPlan.collect { case s: StreamingExecutionRelation => s } + allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] } --- End diff -- Maybe i can make it work for continuous as well with a small tweak --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183559395 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.sources(0).numInputRows === 10) } + + test("input row calculation with trigger having data for one of two V2 sources") { +val streamInput1 = MemoryStream[Int] +val streamInput2 = MemoryStream[Int] + +testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)( + AddData(streamInput1, 1, 2, 3), + CheckAnswer(1, 2, 3), + AssertOnQuery { q => +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 2) +assert(lastProgress.get.sources(0).numInputRows == 3) +assert(lastProgress.get.sources(1).numInputRows == 0) +true + } +) + } + + test("input row calculation with mixed batch and streaming V2 sources") { + +val streamInput = MemoryStream[Int] +val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue") + +testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)( + AddData(streamInput, 1, 2, 3), + AssertOnQuery { q => +q.processAllAvailable() + +// The number of leaves in the trigger's logical plan should be same as the executed plan. +require( + q.lastExecution.logical.collectLeaves().length == +q.lastExecution.executedPlan.collectLeaves().length) + +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 1) +assert(lastProgress.get.sources(0).numInputRows == 3) +true + } +) + +val streamInput2 = MemoryStream[Int] +val staticInputDF2 = staticInputDF.union(staticInputDF).cache() + +testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)( --- End diff -- then there will be two DataSourceScanV2Execs reading from the same location. So we will be reading data twice, and the counts will reflect that. But yes, I should add a test for that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183559165 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.sources(0).numInputRows === 10) } + + test("input row calculation with trigger having data for one of two V2 sources") { +val streamInput1 = MemoryStream[Int] +val streamInput2 = MemoryStream[Int] + +testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)( + AddData(streamInput1, 1, 2, 3), + CheckAnswer(1, 2, 3), + AssertOnQuery { q => +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 2) +assert(lastProgress.get.sources(0).numInputRows == 3) +assert(lastProgress.get.sources(1).numInputRows == 0) +true + } +) + } + + test("input row calculation with mixed batch and streaming V2 sources") { + +val streamInput = MemoryStream[Int] +val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue") + +testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)( + AddData(streamInput, 1, 2, 3), + AssertOnQuery { q => +q.processAllAvailable() + +// The number of leaves in the trigger's logical plan should be same as the executed plan. +require( + q.lastExecution.logical.collectLeaves().length == +q.lastExecution.executedPlan.collectLeaves().length) + +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 1) +assert(lastProgress.get.sources(0).numInputRows == 3) +true + } +) + +val streamInput2 = MemoryStream[Int] +val staticInputDF2 = staticInputDF.union(staticInputDF).cache() --- End diff -- really doesnt matter as the testsuite will shutdown the sparkcontext anyways. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183542367 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.sources(0).numInputRows === 10) } + + test("input row calculation with trigger having data for one of two V2 sources") { +val streamInput1 = MemoryStream[Int] +val streamInput2 = MemoryStream[Int] + +testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)( + AddData(streamInput1, 1, 2, 3), + CheckAnswer(1, 2, 3), + AssertOnQuery { q => +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 2) +assert(lastProgress.get.sources(0).numInputRows == 3) +assert(lastProgress.get.sources(1).numInputRows == 0) +true + } +) + } + + test("input row calculation with mixed batch and streaming V2 sources") { + +val streamInput = MemoryStream[Int] +val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue") + +testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)( + AddData(streamInput, 1, 2, 3), + AssertOnQuery { q => +q.processAllAvailable() + +// The number of leaves in the trigger's logical plan should be same as the executed plan. +require( + q.lastExecution.logical.collectLeaves().length == +q.lastExecution.executedPlan.collectLeaves().length) + +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 1) +assert(lastProgress.get.sources(0).numInputRows == 3) +true + } +) + +val streamInput2 = MemoryStream[Int] +val staticInputDF2 = staticInputDF.union(staticInputDF).cache() + +testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)( --- End diff -- e.g. self-join? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183533297 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -207,62 +209,92 @@ trait ProgressReporter extends Logging { return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) } -// We want to associate execution plan leaves to sources that generate them, so that we match -// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. -// Consider the translation from the streaming logical plan to the final executed plan. -// -// streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan -// -// 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan -//- Each logical plan leaf will be associated with a single streaming source. -//- There can be multiple logical plan leaves associated with a streaming source. -//- There can be leaves not associated with any streaming source, because they were -// generated from a batch source (e.g. stream-batch joins) -// -// 2. Assuming that the executed plan has same number of leaves in the same order as that of -//the trigger logical plan, we associate executed plan leaves with corresponding -//streaming sources. -// -// 3. For each source, we sum the metrics of the associated execution plan leaves. -// -val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) => - logicalPlan.collectLeaves().map { leaf => leaf -> source } +val numInputRows = extractSourceToNumInputRows() + +val eventTimeStats = lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => +val stats = e.eventTimeStats.value +Map( + "max" -> stats.max, + "min" -> stats.min, + "avg" -> stats.avg.toLong).mapValues(formatTimestamp) +}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp + +ExecutionStats(numInputRows, stateOperators, eventTimeStats) + } + + /** Extract number of input sources for each streaming source in plan */ + private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = { + +def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = { + tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source } -val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming -val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() -val numInputRows: Map[BaseStreamingSource, Long] = + +val onlyDataSourceV2Sources = { + // Check whether the streaming query's logical plan has only V2 data sources + val allStreamingLeaves = +logicalPlan.collect { case s: StreamingExecutionRelation => s } + allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] } --- End diff -- we don't have a way to track these for ContinuousProcessing at the moment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183542307 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.sources(0).numInputRows === 10) } + + test("input row calculation with trigger having data for one of two V2 sources") { +val streamInput1 = MemoryStream[Int] +val streamInput2 = MemoryStream[Int] + +testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)( + AddData(streamInput1, 1, 2, 3), + CheckAnswer(1, 2, 3), + AssertOnQuery { q => +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 2) +assert(lastProgress.get.sources(0).numInputRows == 3) +assert(lastProgress.get.sources(1).numInputRows == 0) +true + } +) + } + + test("input row calculation with mixed batch and streaming V2 sources") { + +val streamInput = MemoryStream[Int] +val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue") + +testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)( + AddData(streamInput, 1, 2, 3), + AssertOnQuery { q => +q.processAllAvailable() + +// The number of leaves in the trigger's logical plan should be same as the executed plan. +require( + q.lastExecution.logical.collectLeaves().length == +q.lastExecution.executedPlan.collectLeaves().length) + +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 1) +assert(lastProgress.get.sources(0).numInputRows == 3) +true + } +) + +val streamInput2 = MemoryStream[Int] +val staticInputDF2 = staticInputDF.union(staticInputDF).cache() + +testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)( --- End diff -- what if you do a stream-stream join? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183542025 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -733,6 +804,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + def getLastProgressWithData(q: StreamingQuery): Option[StreamingQueryProgress] = { +q.recentProgress.filter(_.numInputRows > 0).lastOption + } + + --- End diff -- nit: extra line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21126#discussion_r183541978 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.sources(0).numInputRows === 10) } + + test("input row calculation with trigger having data for one of two V2 sources") { +val streamInput1 = MemoryStream[Int] +val streamInput2 = MemoryStream[Int] + +testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)( + AddData(streamInput1, 1, 2, 3), + CheckAnswer(1, 2, 3), + AssertOnQuery { q => +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 2) +assert(lastProgress.get.sources(0).numInputRows == 3) +assert(lastProgress.get.sources(1).numInputRows == 0) +true + } +) + } + + test("input row calculation with mixed batch and streaming V2 sources") { + +val streamInput = MemoryStream[Int] +val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue") + +testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)( + AddData(streamInput, 1, 2, 3), + AssertOnQuery { q => +q.processAllAvailable() + +// The number of leaves in the trigger's logical plan should be same as the executed plan. +require( + q.lastExecution.logical.collectLeaves().length == +q.lastExecution.executedPlan.collectLeaves().length) + +val lastProgress = getLastProgressWithData(q) +assert(lastProgress.nonEmpty) +assert(lastProgress.get.numInputRows == 3) +assert(lastProgress.get.sources.length == 1) +assert(lastProgress.get.sources(0).numInputRows == 3) +true + } +) + +val streamInput2 = MemoryStream[Int] +val staticInputDF2 = staticInputDF.union(staticInputDF).cache() --- End diff -- nit: unpersist later? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/21126 [SPARK-24050][SS] Calculate input / processing rates correctly for DataSourceV2 streaming sources ## What changes were proposed in this pull request? In some streaming queries, the input and processing rates are not calculated at all (shows up as zero) because MicroBatchExecution fails to associated metrics from the executed plan of a trigger with the sources in the logical plan of the trigger. The way this executed-plan-leaf-to-logical-source attribution works is as follows. With V1 sources, there was no way to identify which execution plan leaves were generated by a streaming source. So did a best-effort attempt to match logical and execution plan leaves when the number of leaves were same. In cases where the number of leaves is different, we just give up and report zero rates. An example where this may happen is as follows. ``` val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache() val streamingInputDF = ... val query = streamingInputDF.join(cachedStaticDF).writeStream ``` In this case, the `cachedStaticDF` has multiple logical leaves, but in the trigger's execution plan it only has leaf because a cached subplan is represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in the number of leaves causing the input rates to be computed as zero. With DataSourceV2, all inputs are represented in the executed plan using `DataSourceV2ScanExec`, each of which has a reference to the associated logical `DataSource` and `DataSourceReader`. So its easy to associate the metrics to the original streaming sources. In this PR, the solution is as follows. If all the streaming sources in a streaming query as v2 sources, then use a new code path where the execution-metrics-to-source mapping is done directly. Otherwise we fall back to existing mapping logic. ## How was this patch tested? - New unit tests using V2 memory source - Existing unit tests using V1 source You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-24050 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21126.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21126 commit d485db8ec70a8bd8e2fff488e75be08d384ceef0 Author: Tathagata DasDate: 2018-04-23T06:20:14Z SPARK-24050 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org