[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21126


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-24 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183912999
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -207,62 +209,126 @@ trait ProgressReporter extends Logging {
   return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
 }
 
-// We want to associate execution plan leaves to sources that generate 
them, so that we match
-// the their metrics (e.g. numOutputRows) to the sources. To do this 
we do the following.
-// Consider the translation from the streaming logical plan to the 
final executed plan.
-//
-//  streaming logical plan (with sources) <==> trigger's logical plan 
<==> executed plan
-//
-// 1. We keep track of streaming sources associated with each leaf in 
the trigger's logical plan
-//- Each logical plan leaf will be associated with a single 
streaming source.
-//- There can be multiple logical plan leaves associated with a 
streaming source.
-//- There can be leaves not associated with any streaming source, 
because they were
-//  generated from a batch source (e.g. stream-batch joins)
-//
-// 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
-//the trigger logical plan, we associate executed plan leaves with 
corresponding
-//streaming sources.
-//
-// 3. For each source, we sum the metrics of the associated execution 
plan leaves.
-//
-val logicalPlanLeafToSource = newData.flatMap { case (source, 
logicalPlan) =>
-  logicalPlan.collectLeaves().map { leaf => leaf -> source }
+val numInputRows = extractSourceToNumInputRows()
+
+val eventTimeStats = lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+val stats = e.eventTimeStats.value
+Map(
+  "max" -> stats.max,
+  "min" -> stats.min,
+  "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
+}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+ExecutionStats(numInputRows, stateOperators, eventTimeStats)
+  }
+
+  /** Extract number of input sources for each streaming source in plan */
+  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, 
Long] = {
+
+import java.util.IdentityHashMap
+import scala.collection.JavaConverters._
+
+def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): 
Map[BaseStreamingSource, Long] = {
+  tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for 
each source
 }
-val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // 
includes non-streaming
-val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
-val numInputRows: Map[BaseStreamingSource, Long] =
+
+val onlyDataSourceV2Sources = {
+  // Check whether the streaming query's logical plan has only V2 data 
sources
+  val allStreamingLeaves =
+logicalPlan.collect { case s: StreamingExecutionRelation => s }
+  allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
+}
+
+if (onlyDataSourceV2Sources) {
+  // DataSourceV2ScanExec is the execution plan leaf that is 
responsible for reading data
+  // from a V2 source and has a direct reference to the V2 source that 
generated it. Each
+  // DataSourceV2ScanExec records the number of rows it has read using 
SQLMetrics. However,
+  // just collecting all DataSourceV2ScanExec nodes and getting the 
metric is not correct as
+  // a DataSourceV2ScanExec instance may be referred to in the 
execution plan from two (or
+  // even multiple times) points and considering it twice will leads 
to double counting. We
+  // can't dedup them using their hashcode either because two 
different instances of
+  // DataSourceV2ScanExec can have the same hashcode but account for 
separate sets of
+  // records read, and deduping them to consider only one of them 
would be undercounting the
+  // records read. Therefore the right way to do this is to consider 
the unique instances of
+  // DataSourceV2ScanExec (using their identity hash codes) and get 
metrics from them.
+  // Hence we calculate in the following way.
+  //
+  // 1. Collect all the unique DataSourceV2ScanExec instances using 
IdentityHashMap.
+  //
+  // 2. Extract the source and the number of rows read from the 
DataSourceV2ScanExec instanes.
+  //
+  // 3. Multiple DataSourceV2ScanExec instance may refer to the same 
source (can happen with
+  

[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183897545
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -207,62 +209,126 @@ trait ProgressReporter extends Logging {
   return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
 }
 
-// We want to associate execution plan leaves to sources that generate 
them, so that we match
-// the their metrics (e.g. numOutputRows) to the sources. To do this 
we do the following.
-// Consider the translation from the streaming logical plan to the 
final executed plan.
-//
-//  streaming logical plan (with sources) <==> trigger's logical plan 
<==> executed plan
-//
-// 1. We keep track of streaming sources associated with each leaf in 
the trigger's logical plan
-//- Each logical plan leaf will be associated with a single 
streaming source.
-//- There can be multiple logical plan leaves associated with a 
streaming source.
-//- There can be leaves not associated with any streaming source, 
because they were
-//  generated from a batch source (e.g. stream-batch joins)
-//
-// 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
-//the trigger logical plan, we associate executed plan leaves with 
corresponding
-//streaming sources.
-//
-// 3. For each source, we sum the metrics of the associated execution 
plan leaves.
-//
-val logicalPlanLeafToSource = newData.flatMap { case (source, 
logicalPlan) =>
-  logicalPlan.collectLeaves().map { leaf => leaf -> source }
+val numInputRows = extractSourceToNumInputRows()
+
+val eventTimeStats = lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+val stats = e.eventTimeStats.value
+Map(
+  "max" -> stats.max,
+  "min" -> stats.min,
+  "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
+}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
--- End diff --

This above code stayed the same. The diff is pretty dumb. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183660795
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -207,62 +209,92 @@ trait ProgressReporter extends Logging {
   return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
 }
 
-// We want to associate execution plan leaves to sources that generate 
them, so that we match
-// the their metrics (e.g. numOutputRows) to the sources. To do this 
we do the following.
-// Consider the translation from the streaming logical plan to the 
final executed plan.
-//
-//  streaming logical plan (with sources) <==> trigger's logical plan 
<==> executed plan
-//
-// 1. We keep track of streaming sources associated with each leaf in 
the trigger's logical plan
-//- Each logical plan leaf will be associated with a single 
streaming source.
-//- There can be multiple logical plan leaves associated with a 
streaming source.
-//- There can be leaves not associated with any streaming source, 
because they were
-//  generated from a batch source (e.g. stream-batch joins)
-//
-// 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
-//the trigger logical plan, we associate executed plan leaves with 
corresponding
-//streaming sources.
-//
-// 3. For each source, we sum the metrics of the associated execution 
plan leaves.
-//
-val logicalPlanLeafToSource = newData.flatMap { case (source, 
logicalPlan) =>
-  logicalPlan.collectLeaves().map { leaf => leaf -> source }
+val numInputRows = extractSourceToNumInputRows()
+
+val eventTimeStats = lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+val stats = e.eventTimeStats.value
+Map(
+  "max" -> stats.max,
+  "min" -> stats.min,
+  "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
+}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+ExecutionStats(numInputRows, stateOperators, eventTimeStats)
+  }
+
+  /** Extract number of input sources for each streaming source in plan */
+  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, 
Long] = {
+
+def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): 
Map[BaseStreamingSource, Long] = {
+  tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for 
each source
 }
-val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // 
includes non-streaming
-val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
-val numInputRows: Map[BaseStreamingSource, Long] =
+
+val onlyDataSourceV2Sources = {
+  // Check whether the streaming query's logical plan has only V2 data 
sources
+  val allStreamingLeaves =
+logicalPlan.collect { case s: StreamingExecutionRelation => s }
+  allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
--- End diff --

Yeah. This code path is not used by continuous processing. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183660623
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 assert(progress.sources(0).numInputRows === 10)
   }
 
+
+  test("input row calculation with trigger having data for one of two V2 
sources") {
+val streamInput1 = MemoryStream[Int]
+val streamInput2 = MemoryStream[Int]
+
+testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = 
true)(
+  AddData(streamInput1, 1, 2, 3),
+  CheckAnswer(1, 2, 3),
+  AssertOnQuery { q =>
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 2)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+assert(lastProgress.get.sources(1).numInputRows == 0)
+true
+  }
+)
+  }
+
+  test("input row calculation with mixed batch and streaming V2 sources") {
+
+val streamInput = MemoryStream[Int]
+val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
+
+testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink 
= true)(
+  AddData(streamInput, 1, 2, 3),
+  AssertOnQuery { q =>
+q.processAllAvailable()
+
+// The number of leaves in the trigger's logical plan should be 
same as the executed plan.
+require(
+  q.lastExecution.logical.collectLeaves().length ==
+q.lastExecution.executedPlan.collectLeaves().length)
+
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 1)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+true
+  }
+)
+
+val streamInput2 = MemoryStream[Int]
+val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
+
+testStream(streamInput2.toDF().join(staticInputDF2, "value"), 
useV2Sink = true)(
--- End diff --

Turns one things were broken for self-joins and self-union. updated the 
logic and added tests for those for v2 sources.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183605577
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 assert(progress.sources(0).numInputRows === 10)
   }
 
+
+  test("input row calculation with trigger having data for one of two V2 
sources") {
+val streamInput1 = MemoryStream[Int]
+val streamInput2 = MemoryStream[Int]
+
+testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = 
true)(
+  AddData(streamInput1, 1, 2, 3),
+  CheckAnswer(1, 2, 3),
+  AssertOnQuery { q =>
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 2)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+assert(lastProgress.get.sources(1).numInputRows == 0)
+true
+  }
--- End diff --

nit: i'd suggest doing an AddData() for the other stream after, to make 
sure there's not some weird order dependence


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183604136
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -207,62 +209,92 @@ trait ProgressReporter extends Logging {
   return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
 }
 
-// We want to associate execution plan leaves to sources that generate 
them, so that we match
-// the their metrics (e.g. numOutputRows) to the sources. To do this 
we do the following.
-// Consider the translation from the streaming logical plan to the 
final executed plan.
-//
-//  streaming logical plan (with sources) <==> trigger's logical plan 
<==> executed plan
-//
-// 1. We keep track of streaming sources associated with each leaf in 
the trigger's logical plan
-//- Each logical plan leaf will be associated with a single 
streaming source.
-//- There can be multiple logical plan leaves associated with a 
streaming source.
-//- There can be leaves not associated with any streaming source, 
because they were
-//  generated from a batch source (e.g. stream-batch joins)
-//
-// 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
-//the trigger logical plan, we associate executed plan leaves with 
corresponding
-//streaming sources.
-//
-// 3. For each source, we sum the metrics of the associated execution 
plan leaves.
-//
-val logicalPlanLeafToSource = newData.flatMap { case (source, 
logicalPlan) =>
-  logicalPlan.collectLeaves().map { leaf => leaf -> source }
+val numInputRows = extractSourceToNumInputRows()
+
+val eventTimeStats = lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+val stats = e.eventTimeStats.value
+Map(
+  "max" -> stats.max,
+  "min" -> stats.min,
+  "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
+}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+ExecutionStats(numInputRows, stateOperators, eventTimeStats)
+  }
+
+  /** Extract number of input sources for each streaming source in plan */
+  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, 
Long] = {
+
+def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): 
Map[BaseStreamingSource, Long] = {
+  tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for 
each source
 }
-val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // 
includes non-streaming
-val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
-val numInputRows: Map[BaseStreamingSource, Long] =
+
+val onlyDataSourceV2Sources = {
+  // Check whether the streaming query's logical plan has only V2 data 
sources
+  val allStreamingLeaves =
+logicalPlan.collect { case s: StreamingExecutionRelation => s }
+  allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
--- End diff --

A point fix here won't be sufficient - right now the metrics don't make it 
to the driver at all in continuous processing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183559926
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -207,62 +209,92 @@ trait ProgressReporter extends Logging {
   return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
 }
 
-// We want to associate execution plan leaves to sources that generate 
them, so that we match
-// the their metrics (e.g. numOutputRows) to the sources. To do this 
we do the following.
-// Consider the translation from the streaming logical plan to the 
final executed plan.
-//
-//  streaming logical plan (with sources) <==> trigger's logical plan 
<==> executed plan
-//
-// 1. We keep track of streaming sources associated with each leaf in 
the trigger's logical plan
-//- Each logical plan leaf will be associated with a single 
streaming source.
-//- There can be multiple logical plan leaves associated with a 
streaming source.
-//- There can be leaves not associated with any streaming source, 
because they were
-//  generated from a batch source (e.g. stream-batch joins)
-//
-// 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
-//the trigger logical plan, we associate executed plan leaves with 
corresponding
-//streaming sources.
-//
-// 3. For each source, we sum the metrics of the associated execution 
plan leaves.
-//
-val logicalPlanLeafToSource = newData.flatMap { case (source, 
logicalPlan) =>
-  logicalPlan.collectLeaves().map { leaf => leaf -> source }
+val numInputRows = extractSourceToNumInputRows()
+
+val eventTimeStats = lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+val stats = e.eventTimeStats.value
+Map(
+  "max" -> stats.max,
+  "min" -> stats.min,
+  "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
+}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+ExecutionStats(numInputRows, stateOperators, eventTimeStats)
+  }
+
+  /** Extract number of input sources for each streaming source in plan */
+  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, 
Long] = {
+
+def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): 
Map[BaseStreamingSource, Long] = {
+  tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for 
each source
 }
-val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // 
includes non-streaming
-val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
-val numInputRows: Map[BaseStreamingSource, Long] =
+
+val onlyDataSourceV2Sources = {
+  // Check whether the streaming query's logical plan has only V2 data 
sources
+  val allStreamingLeaves =
+logicalPlan.collect { case s: StreamingExecutionRelation => s }
+  allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
--- End diff --

Maybe i can make it work for continuous as well with a small tweak



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183559395
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 assert(progress.sources(0).numInputRows === 10)
   }
 
+
+  test("input row calculation with trigger having data for one of two V2 
sources") {
+val streamInput1 = MemoryStream[Int]
+val streamInput2 = MemoryStream[Int]
+
+testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = 
true)(
+  AddData(streamInput1, 1, 2, 3),
+  CheckAnswer(1, 2, 3),
+  AssertOnQuery { q =>
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 2)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+assert(lastProgress.get.sources(1).numInputRows == 0)
+true
+  }
+)
+  }
+
+  test("input row calculation with mixed batch and streaming V2 sources") {
+
+val streamInput = MemoryStream[Int]
+val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
+
+testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink 
= true)(
+  AddData(streamInput, 1, 2, 3),
+  AssertOnQuery { q =>
+q.processAllAvailable()
+
+// The number of leaves in the trigger's logical plan should be 
same as the executed plan.
+require(
+  q.lastExecution.logical.collectLeaves().length ==
+q.lastExecution.executedPlan.collectLeaves().length)
+
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 1)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+true
+  }
+)
+
+val streamInput2 = MemoryStream[Int]
+val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
+
+testStream(streamInput2.toDF().join(staticInputDF2, "value"), 
useV2Sink = true)(
--- End diff --

then there will be two DataSourceScanV2Execs reading from the same 
location. So we will be reading data twice, and the counts will reflect that. 
But yes, I should add a test for that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183559165
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 assert(progress.sources(0).numInputRows === 10)
   }
 
+
+  test("input row calculation with trigger having data for one of two V2 
sources") {
+val streamInput1 = MemoryStream[Int]
+val streamInput2 = MemoryStream[Int]
+
+testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = 
true)(
+  AddData(streamInput1, 1, 2, 3),
+  CheckAnswer(1, 2, 3),
+  AssertOnQuery { q =>
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 2)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+assert(lastProgress.get.sources(1).numInputRows == 0)
+true
+  }
+)
+  }
+
+  test("input row calculation with mixed batch and streaming V2 sources") {
+
+val streamInput = MemoryStream[Int]
+val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
+
+testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink 
= true)(
+  AddData(streamInput, 1, 2, 3),
+  AssertOnQuery { q =>
+q.processAllAvailable()
+
+// The number of leaves in the trigger's logical plan should be 
same as the executed plan.
+require(
+  q.lastExecution.logical.collectLeaves().length ==
+q.lastExecution.executedPlan.collectLeaves().length)
+
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 1)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+true
+  }
+)
+
+val streamInput2 = MemoryStream[Int]
+val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
--- End diff --

really doesnt matter as the testsuite will shutdown the sparkcontext 
anyways.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183542367
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 assert(progress.sources(0).numInputRows === 10)
   }
 
+
+  test("input row calculation with trigger having data for one of two V2 
sources") {
+val streamInput1 = MemoryStream[Int]
+val streamInput2 = MemoryStream[Int]
+
+testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = 
true)(
+  AddData(streamInput1, 1, 2, 3),
+  CheckAnswer(1, 2, 3),
+  AssertOnQuery { q =>
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 2)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+assert(lastProgress.get.sources(1).numInputRows == 0)
+true
+  }
+)
+  }
+
+  test("input row calculation with mixed batch and streaming V2 sources") {
+
+val streamInput = MemoryStream[Int]
+val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
+
+testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink 
= true)(
+  AddData(streamInput, 1, 2, 3),
+  AssertOnQuery { q =>
+q.processAllAvailable()
+
+// The number of leaves in the trigger's logical plan should be 
same as the executed plan.
+require(
+  q.lastExecution.logical.collectLeaves().length ==
+q.lastExecution.executedPlan.collectLeaves().length)
+
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 1)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+true
+  }
+)
+
+val streamInput2 = MemoryStream[Int]
+val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
+
+testStream(streamInput2.toDF().join(staticInputDF2, "value"), 
useV2Sink = true)(
--- End diff --

e.g. self-join?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183533297
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -207,62 +209,92 @@ trait ProgressReporter extends Logging {
   return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
 }
 
-// We want to associate execution plan leaves to sources that generate 
them, so that we match
-// the their metrics (e.g. numOutputRows) to the sources. To do this 
we do the following.
-// Consider the translation from the streaming logical plan to the 
final executed plan.
-//
-//  streaming logical plan (with sources) <==> trigger's logical plan 
<==> executed plan
-//
-// 1. We keep track of streaming sources associated with each leaf in 
the trigger's logical plan
-//- Each logical plan leaf will be associated with a single 
streaming source.
-//- There can be multiple logical plan leaves associated with a 
streaming source.
-//- There can be leaves not associated with any streaming source, 
because they were
-//  generated from a batch source (e.g. stream-batch joins)
-//
-// 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
-//the trigger logical plan, we associate executed plan leaves with 
corresponding
-//streaming sources.
-//
-// 3. For each source, we sum the metrics of the associated execution 
plan leaves.
-//
-val logicalPlanLeafToSource = newData.flatMap { case (source, 
logicalPlan) =>
-  logicalPlan.collectLeaves().map { leaf => leaf -> source }
+val numInputRows = extractSourceToNumInputRows()
+
+val eventTimeStats = lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+val stats = e.eventTimeStats.value
+Map(
+  "max" -> stats.max,
+  "min" -> stats.min,
+  "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
+}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+ExecutionStats(numInputRows, stateOperators, eventTimeStats)
+  }
+
+  /** Extract number of input sources for each streaming source in plan */
+  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, 
Long] = {
+
+def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): 
Map[BaseStreamingSource, Long] = {
+  tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for 
each source
 }
-val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // 
includes non-streaming
-val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
-val numInputRows: Map[BaseStreamingSource, Long] =
+
+val onlyDataSourceV2Sources = {
+  // Check whether the streaming query's logical plan has only V2 data 
sources
+  val allStreamingLeaves =
+logicalPlan.collect { case s: StreamingExecutionRelation => s }
+  allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
--- End diff --

we don't have a way to track these for ContinuousProcessing at the moment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183542307
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 assert(progress.sources(0).numInputRows === 10)
   }
 
+
+  test("input row calculation with trigger having data for one of two V2 
sources") {
+val streamInput1 = MemoryStream[Int]
+val streamInput2 = MemoryStream[Int]
+
+testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = 
true)(
+  AddData(streamInput1, 1, 2, 3),
+  CheckAnswer(1, 2, 3),
+  AssertOnQuery { q =>
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 2)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+assert(lastProgress.get.sources(1).numInputRows == 0)
+true
+  }
+)
+  }
+
+  test("input row calculation with mixed batch and streaming V2 sources") {
+
+val streamInput = MemoryStream[Int]
+val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
+
+testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink 
= true)(
+  AddData(streamInput, 1, 2, 3),
+  AssertOnQuery { q =>
+q.processAllAvailable()
+
+// The number of leaves in the trigger's logical plan should be 
same as the executed plan.
+require(
+  q.lastExecution.logical.collectLeaves().length ==
+q.lastExecution.executedPlan.collectLeaves().length)
+
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 1)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+true
+  }
+)
+
+val streamInput2 = MemoryStream[Int]
+val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
+
+testStream(streamInput2.toDF().join(staticInputDF2, "value"), 
useV2Sink = true)(
--- End diff --

what if you do a stream-stream join?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183542025
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -733,6 +804,11 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 }
   }
 
+  def getLastProgressWithData(q: StreamingQuery): 
Option[StreamingQueryProgress] = {
+q.recentProgress.filter(_.numInputRows > 0).lastOption
+  }
+
+
--- End diff --

nit: extra line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21126#discussion_r183541978
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 assert(progress.sources(0).numInputRows === 10)
   }
 
+
+  test("input row calculation with trigger having data for one of two V2 
sources") {
+val streamInput1 = MemoryStream[Int]
+val streamInput2 = MemoryStream[Int]
+
+testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = 
true)(
+  AddData(streamInput1, 1, 2, 3),
+  CheckAnswer(1, 2, 3),
+  AssertOnQuery { q =>
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 2)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+assert(lastProgress.get.sources(1).numInputRows == 0)
+true
+  }
+)
+  }
+
+  test("input row calculation with mixed batch and streaming V2 sources") {
+
+val streamInput = MemoryStream[Int]
+val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
+
+testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink 
= true)(
+  AddData(streamInput, 1, 2, 3),
+  AssertOnQuery { q =>
+q.processAllAvailable()
+
+// The number of leaves in the trigger's logical plan should be 
same as the executed plan.
+require(
+  q.lastExecution.logical.collectLeaves().length ==
+q.lastExecution.executedPlan.collectLeaves().length)
+
+val lastProgress = getLastProgressWithData(q)
+assert(lastProgress.nonEmpty)
+assert(lastProgress.get.numInputRows == 3)
+assert(lastProgress.get.sources.length == 1)
+assert(lastProgress.get.sources(0).numInputRows == 3)
+true
+  }
+)
+
+val streamInput2 = MemoryStream[Int]
+val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
--- End diff --

nit: unpersist later?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

2018-04-23 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/21126

[SPARK-24050][SS] Calculate input / processing rates correctly for 
DataSourceV2 streaming sources

## What changes were proposed in this pull request?

In some streaming queries, the input and processing rates are not 
calculated at all (shows up as zero) because MicroBatchExecution fails to 
associated metrics from the executed plan of a trigger with the sources in the 
logical plan of the trigger. The way this executed-plan-leaf-to-logical-source 
attribution works is as follows. With V1 sources, there was no way to identify 
which execution plan leaves were generated by a streaming source. So did a 
best-effort attempt to match logical and execution plan leaves when the number 
of leaves were same. In cases where the number of leaves is different, we just 
give up and report zero rates. An example where this may happen is as follows.

```
val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
val streamingInputDF = ...

val query = streamingInputDF.join(cachedStaticDF).writeStream
```
In this case, the `cachedStaticDF` has multiple logical leaves, but in the 
trigger's execution plan it only has leaf because a cached subplan is 
represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in 
the number of leaves causing the input rates to be computed as zero. 

With DataSourceV2, all inputs are represented in the executed plan using 
`DataSourceV2ScanExec`, each of which has a reference to the associated logical 
`DataSource` and `DataSourceReader`. So its easy to associate the metrics to 
the original streaming sources.

In this PR, the solution is as follows. If all the streaming sources in a 
streaming query as v2 sources, then use a new code path where the 
execution-metrics-to-source mapping is done directly. Otherwise we fall back to 
existing mapping logic.

## How was this patch tested?
- New unit tests using V2 memory source
- Existing unit tests using V1 source


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-24050

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21126.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21126


commit d485db8ec70a8bd8e2fff488e75be08d384ceef0
Author: Tathagata Das 
Date:   2018-04-23T06:20:14Z

SPARK-24050




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org