[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19239


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139259901
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstDf = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondDf = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "5 seconds")
+  .select('value)
+
+val union = firstDf.union(secondDf)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def getWatermarkAfterData(
+firstData: Seq[Int] = Seq.empty,
+secondData: Seq[Int] = Seq.empty): Long = {
+  if (firstData.nonEmpty) first.addData(firstData)
+  if (secondData.nonEmpty) second.addData(secondData)
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  first.addData(0)
+  union.processAllAvailable()
+  // get last watermark
+  val lastExecution = 
union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
+  lastExecution.offsetSeqMetadata.batchWatermarkMs
+}
+
+// Global watermark starts at 0 until we get data from both sides
+assert(getWatermarkAfterData(firstData = Seq(11)) == 0)
+assert(getWatermarkAfterData(secondData = Seq(6)) == 1000)
+// Global watermark stays at left watermark 1 when right watermark 
moves to 2
+assert(getWatermarkAfterData(secondData = Seq(8)) == 1000)
+// Global watermark switches to right side value 2 when left watermark 
goes higher
+assert(getWatermarkAfterData(firstData = Seq(21)) == 3000)
+// Global watermark goes back to left
+assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000)
+// Global watermark stays on left as long as it's below right
+assert(getWatermarkAfterData(firstData = Seq(31)) == 21000)
+assert(getWatermarkAfterData(firstData = Seq(41)) == 31000)
+// Global watermark switches back to right again
+assert(getWatermarkAfterData(firstData = Seq(51)) == 34000)
+
+// Global watermark is updated correctly with simultaneous data from 
both sides
+assert(getWatermarkAfterData(firstData = Seq(100), secondData = 
Seq(100)) == 9)
+assert(getWatermarkAfterData(firstData = Seq(120), secondData = 
Seq(110)) == 105000)
+assert(getWatermarkAfterData(firstData = Seq(130), secondData = 
Seq(125)) == 12)
+
+// Global watermark doesn't decrement with simultaneous data
+assert(getWatermarkAfterData(firstData = Seq(100), secondData = 
Seq(100)) == 12)
+assert(getWatermarkAfterData(firstData = Seq(140), secondData = 
Seq(100)) == 12)
+assert(getWatermarkAfterData(firstData = Seq(100), secondData = 
Seq(135)) == 13)
--- End diff --

test recovery of the minimum after a restart.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread joseph-torres
GitHub user joseph-torres reopened a pull request:

https://github.com/apache/spark/pull/19239

[SPARK-22017] Take minimum of all watermark execs in StreamExecution.

## What changes were proposed in this pull request?

Take the minimum of all watermark exec nodes as the "real" watermark in 
StreamExecution, rather than picking one arbitrarily.

## How was this patch tested?

new unit test


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-22017

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19239


commit 4a7f53fdab1e5e640e156a4a3d2ba27837195195
Author: Jose Torres 
Date:   2017-09-13T21:49:23Z

Implement multiple watermark StreamExecution support.

commit 9b9cd19106fae9a2de268eb2b559ca1bf159e9c2
Author: Jose Torres 
Date:   2017-09-14T18:30:40Z

partially fix test

commit 6a4c80b696f42a445c7f846fada3f823e04bd3ab
Author: Jose Torres 
Date:   2017-09-14T21:52:16Z

Finish rewriting test

commit 484940e5eb4d1eac1c5ec81f475681c9241bbab2
Author: Jose Torres 
Date:   2017-09-14T22:24:36Z

make IncrementalExecution.offsetSeqMetadata non-private

commit 032f55503c8d424390da1ff85054e3a01e7489eb
Author: Jose Torres 
Date:   2017-09-14T23:22:22Z

properly name test dataframes

commit d7f5f60c6be5bf228c960c3549eb81ed869f0227
Author: Jose Torres 
Date:   2017-09-14T23:39:22Z

Combine test helper functions.

commit 2f07f90423d87985322975f8ad5aef8f70f28066
Author: Jose Torres 
Date:   2017-09-15T01:18:12Z

Key watermarks by relative position rather than attribute.

commit 8b605384d77fdeb63b28feabee74284a5ab1409a
Author: Jose Torres 
Date:   2017-09-15T02:05:14Z

Address test comments.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread joseph-torres
Github user joseph-torres closed the pull request at:

https://github.com/apache/spark/pull/19239


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139077988
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,85 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstDf = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondDf = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "5 seconds")
+  .select('value)
+
+val union = firstDf.union(secondDf)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def generateAndAssertNewWatermark(
+stream: MemoryStream[Int],
+data: Seq[Int],
+watermark: Int): Unit = {
+  stream.addData(data)
+  assertWatermark(watermark)
+}
+
+def assertWatermark(watermark: Int) {
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  first.addData(0)
+  union.processAllAvailable()
+
+  val lastExecution = 
union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
+  assert(lastExecution.offsetSeqMetadata.batchWatermarkMs == watermark)
+}
+
+generateAndAssertNewWatermark(first, Seq(11), 1000)
--- End diff --

BTW, why is the first watermark at 1000?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139077656
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,85 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstDf = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondDf = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "5 seconds")
+  .select('value)
+
+val union = firstDf.union(secondDf)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def generateAndAssertNewWatermark(
+stream: MemoryStream[Int],
+data: Seq[Int],
+watermark: Int): Unit = {
+  stream.addData(data)
+  assertWatermark(watermark)
+}
+
+def assertWatermark(watermark: Int) {
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  first.addData(0)
+  union.processAllAvailable()
+
+  val lastExecution = 
union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
+  assert(lastExecution.offsetSeqMetadata.batchWatermarkMs == watermark)
+}
+
+generateAndAssertNewWatermark(first, Seq(11), 1000)
--- End diff --

instead of having two variations for handing single input and double 
inputs, you can do something like this. 
```
def getWatermarkAfterData(firstData: Seq[Int] = Seq.empty, secondData: 
Seq[Int] = Seq.empty): Long = {
   if (firstData.nonEmpty) first.add(firstData)
   if (secondData.nonEmpty) second.add(secondData)
   union.processAllAvailable()
   // add a dummy batch so lastExecution has the new watermark
first.addData(0)
union.processAllAvailable()
// get updated watermark
val lastExecution = 
union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
lastExecution.offsetSeqMetadata.batchWatermarkMs
}

assert(getWatermarkAfterData(firstData = Seq(...) === 1)
assert(getWatermarkAfterData(secondData = Seq(...) === 1)
assert(getWatermarkAfterData(firstData = Seq(...), secondData = Seq(...) 
=== 1)

```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139076625
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -560,13 +567,25 @@ class StreamExecution(
 }
 if (hasNewData) {
   var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-  // Update the eventTime watermark if we find one in the plan.
+  // Update the eventTime watermarks if we find any in the plan.
   if (lastExecution != null) {
 lastExecution.executedPlan.collect {
-  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 
0 =>
+  case e: EventTimeWatermarkExec => e
+}.zipWithIndex.foreach {
+  case (e, index) if e.eventTimeStats.value.count > 0 =>
 logDebug(s"Observed event time stats: 
${e.eventTimeStats.value}")
-e.eventTimeStats.value.max - e.delayMs
-}.headOption.foreach { newWatermarkMs =>
+val newAttributeWatermarkMs = e.eventTimeStats.value.max - 
e.delayMs
+val mappedWatermarkMs: Option[Long] = watermarkMsMap.get(index)
+if (mappedWatermarkMs.isEmpty || newAttributeWatermarkMs > 
mappedWatermarkMs.get) {
+  watermarkMsMap.put(index, newAttributeWatermarkMs)
+}
+
+  case _ =>
+}
+
+// Update the query watermark to the minimum of all attribute 
watermarks.
--- End diff --

Add a little bit more docs saying that this is the safe thing to do as 
watermark guarantees .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139070352
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -560,13 +567,25 @@ class StreamExecution(
 }
 if (hasNewData) {
   var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-  // Update the eventTime watermark if we find one in the plan.
+  // Update the eventTime watermarks if we find any in the plan.
   if (lastExecution != null) {
 lastExecution.executedPlan.collect {
-  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 
0 =>
+  case e: EventTimeWatermarkExec => e
+}.zipWithIndex.foreach {
+  case (e, index) if e.eventTimeStats.value.count > 0 =>
 logDebug(s"Observed event time stats: 
${e.eventTimeStats.value}")
-e.eventTimeStats.value.max - e.delayMs
-}.headOption.foreach { newWatermarkMs =>
+val newAttributeWatermarkMs = e.eventTimeStats.value.max - 
e.delayMs
+val mappedWatermarkMs: Option[Long] = watermarkMsMap.get(index)
--- End diff --

mappedWatermarkMs -> previousWatermarkMs 
more semantically meaningful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139070245
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -560,13 +567,25 @@ class StreamExecution(
 }
 if (hasNewData) {
   var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-  // Update the eventTime watermark if we find one in the plan.
+  // Update the eventTime watermarks if we find any in the plan.
   if (lastExecution != null) {
 lastExecution.executedPlan.collect {
-  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 
0 =>
+  case e: EventTimeWatermarkExec => e
+}.zipWithIndex.foreach {
+  case (e, index) if e.eventTimeStats.value.count > 0 =>
 logDebug(s"Observed event time stats: 
${e.eventTimeStats.value}")
-e.eventTimeStats.value.max - e.delayMs
-}.headOption.foreach { newWatermarkMs =>
+val newAttributeWatermarkMs = e.eventTimeStats.value.max - 
e.delayMs
--- End diff --

also... newAttributeWatermarkMs -> newWatermarkMs
why "attribute"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139070184
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -560,13 +567,25 @@ class StreamExecution(
 }
 if (hasNewData) {
   var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-  // Update the eventTime watermark if we find one in the plan.
+  // Update the eventTime watermarks if we find any in the plan.
   if (lastExecution != null) {
 lastExecution.executedPlan.collect {
-  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 
0 =>
+  case e: EventTimeWatermarkExec => e
+}.zipWithIndex.foreach {
+  case (e, index) if e.eventTimeStats.value.count > 0 =>
 logDebug(s"Observed event time stats: 
${e.eventTimeStats.value}")
-e.eventTimeStats.value.max - e.delayMs
-}.headOption.foreach { newWatermarkMs =>
+val newAttributeWatermarkMs = e.eventTimeStats.value.max - 
e.delayMs
+val mappedWatermarkMs: Option[Long] = watermarkMsMap.get(index)
--- End diff --

nit: Option[Long] is not needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139070141
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -560,13 +567,25 @@ class StreamExecution(
 }
 if (hasNewData) {
   var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-  // Update the eventTime watermark if we find one in the plan.
+  // Update the eventTime watermarks if we find any in the plan.
   if (lastExecution != null) {
 lastExecution.executedPlan.collect {
-  case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 
0 =>
+  case e: EventTimeWatermarkExec => e
+}.zipWithIndex.foreach {
+  case (e, index) if e.eventTimeStats.value.count > 0 =>
 logDebug(s"Observed event time stats: 
${e.eventTimeStats.value}")
-e.eventTimeStats.value.max - e.delayMs
-}.headOption.foreach { newWatermarkMs =>
+val newAttributeWatermarkMs = e.eventTimeStats.value.max - 
e.delayMs
--- End diff --

Add the e or index in the log debug to differentiate between different 
operators.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139069900
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -560,13 +567,24 @@ class StreamExecution(
 }
 if (hasNewData) {
   var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-  // Update the eventTime watermark if we find one in the plan.
+  // Update the eventTime watermarks if we find any in the plan.
--- End diff --

Yeah. I had made this comment when i was thinking that we dont need the 
mutable map. Ignore this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139069662
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -130,6 +130,13 @@ class StreamExecution(
   protected var offsetSeqMetadata = OffsetSeqMetadata(
 batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf)
 
+  /**
+   * A map of current watermarks, keyed by the position of the watermark 
operator in the
+   * physical plan. The minimum watermark timestamp present here will be 
used and persisted as the
+   * query's watermark when preparing each batch, so it's ok that this val 
isn't fault-tolerant.
--- End diff --

Make it clear that "this state is soft state and does not require 
persistence, because the minimum .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139036524
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstAggregation = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondAggregation = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val union = firstAggregation.union(secondAggregation)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def populateNewWatermarkFromData(stream: MemoryStream[Int], data: 
Int*): Unit = {
+  stream.addData(data)
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  stream.addData(0)
+  union.processAllAvailable()
+}
+
+def assertQueryWatermark(watermark: Int): Unit = {
+  assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery
+.lastExecution.offsetSeqMetadata.batchWatermarkMs
+== watermark)
+}
+
+populateNewWatermarkFromData(first, 11)
+assertQueryWatermark(1000)
+
+// Watermark stays at 1 from the left when right watermark moves to 2
--- End diff --

nit: Watermark to "global watermark" .. to differentiate from "right 
watermark" later in the sentence.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139036217
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstAggregation = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondAggregation = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val union = firstAggregation.union(secondAggregation)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def populateNewWatermarkFromData(stream: MemoryStream[Int], data: 
Int*): Unit = {
+  stream.addData(data)
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  stream.addData(0)
+  union.processAllAvailable()
+}
+
+def assertQueryWatermark(watermark: Int): Unit = {
+  assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery
+.lastExecution.offsetSeqMetadata.batchWatermarkMs
+== watermark)
+}
+
+populateNewWatermarkFromData(first, 11)
--- End diff --

You are right. what you have is better. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139036315
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstAggregation = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondAggregation = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
--- End diff --

Can you update the test to have different watermark delays, so that we test 
that we are choosing min delay, but the min watermark?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139035990
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstAggregation = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondAggregation = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val union = firstAggregation.union(secondAggregation)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def populateNewWatermarkFromData(stream: MemoryStream[Int], data: 
Int*): Unit = {
+  stream.addData(data)
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  stream.addData(0)
+  union.processAllAvailable()
+}
+
+def assertQueryWatermark(watermark: Int): Unit = {
+  assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery
--- End diff --

this line break is hard to read. how about you break it with an 
intermediate variable (e.g. `val lastExecution = ... ; assert(...) `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139035867
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstAggregation = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondAggregation = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val union = firstAggregation.union(secondAggregation)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def populateNewWatermarkFromData(stream: MemoryStream[Int], data: 
Int*): Unit = {
+  stream.addData(data)
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  stream.addData(0)
+  union.processAllAvailable()
+}
+
+def assertQueryWatermark(watermark: Int): Unit = {
--- End diff --

nit: assertWatermark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139034057
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -130,6 +130,13 @@ class StreamExecution(
   protected var offsetSeqMetadata = OffsetSeqMetadata(
 batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf)
 
+  /**
+   * A map from watermarked attributes to their current watermark. The 
minimum watermark
+   * timestamp present here will be used as the overall query watermark in 
offsetSeqMetadata;
+   * the query watermark is what's logged and used to age out old state.
+   */
+  protected var attributeWatermarkMsMap: AttributeMap[Long] = 
AttributeMap(Seq())
--- End diff --

I see. I was mistaken. However in that case, use a mutable.HashMap instead 
of var. That the code style we use with scala is not to use vars unless 
absolutely needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139033482
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstAggregation = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondAggregation = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val union = firstAggregation.union(secondAggregation)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def populateNewWatermarkFromData(stream: MemoryStream[Int], data: 
Int*): Unit = {
+  stream.addData(data)
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  stream.addData(0)
+  union.processAllAvailable()
+}
+
+def assertQueryWatermark(watermark: Int): Unit = {
+  assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery
+.lastExecution.offsetSeqMetadata.batchWatermarkMs
+== watermark)
+}
+
+populateNewWatermarkFromData(first, 11)
--- End diff --

even better example - 
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L107



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139033208
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstAggregation = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondAggregation = second.toDF()
--- End diff --

there is no aggregation here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139029401
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -560,13 +567,24 @@ class StreamExecution(
 }
 if (hasNewData) {
   var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-  // Update the eventTime watermark if we find one in the plan.
+  // Update the eventTime watermarks if we find any in the plan.
--- End diff --

Well, we're updating multiple watermarks in the map. We later update 
`offsetSeqMetadata` with the new minimum one, but that's not in this block.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139029187
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -130,6 +130,13 @@ class StreamExecution(
   protected var offsetSeqMetadata = OffsetSeqMetadata(
 batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf)
 
+  /**
+   * A map from watermarked attributes to their current watermark. The 
minimum watermark
+   * timestamp present here will be used as the overall query watermark in 
offsetSeqMetadata;
+   * the query watermark is what's logged and used to age out old state.
+   */
+  protected var attributeWatermarkMsMap: AttributeMap[Long] = 
AttributeMap(Seq())
--- End diff --

This map has to persist and get updated across batches, and I'm not sure 
how to do that with a local variable or a val.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139028613
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstAggregation = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondAggregation = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val union = firstAggregation.union(secondAggregation)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def populateNewWatermarkFromData(stream: MemoryStream[Int], data: 
Int*): Unit = {
+  stream.addData(data)
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  stream.addData(0)
+  union.processAllAvailable()
+}
+
+def assertQueryWatermark(watermark: Int): Unit = {
+  assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery
+.lastExecution.offsetSeqMetadata.batchWatermarkMs
+== watermark)
+}
+
+populateNewWatermarkFromData(first, 11)
--- End diff --

The problem is that watermark recalculation happens at the beginning of 
each batch, and to sequence executions I have to call CheckData or 
CheckLastBatch. So that method ends up producing a test multiple times longer, 
since a single entry is:

AddData(realData)
CheckLastBatch
AddData(0)
CheckLastBatch
AssertOnQuery


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139026379
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstAggregation = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondAggregation = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val union = firstAggregation.union(secondAggregation)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def populateNewWatermarkFromData(stream: MemoryStream[Int], data: 
Int*): Unit = {
+  stream.addData(data)
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  stream.addData(0)
+  union.processAllAvailable()
+}
+
+def assertQueryWatermark(watermark: Int): Unit = {
+  assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery
+.lastExecution.offsetSeqMetadata.batchWatermarkMs
+== watermark)
+}
+
+populateNewWatermarkFromData(first, 11)
--- End diff --

Also, I think you can use the `testStream..AddData... AssertOnQuery` 
pattern. its cleaner.

https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala#L180


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139026066
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 )
   }
 
+  test("watermark with 2 streams") {
+val first = MemoryStream[Int]
+
+val firstAggregation = first.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val second = MemoryStream[Int]
+
+val secondAggregation = second.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .select('value)
+
+val union = firstAggregation.union(secondAggregation)
+  .writeStream
+  .format("memory")
+  .queryName("test")
+  .start()
+
+def populateNewWatermarkFromData(stream: MemoryStream[Int], data: 
Int*): Unit = {
+  stream.addData(data)
+  union.processAllAvailable()
+  // add a dummy batch so lastExecution has the new watermark
+  stream.addData(0)
+  union.processAllAvailable()
+}
+
+def assertQueryWatermark(watermark: Int): Unit = {
+  assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery
+.lastExecution.offsetSeqMetadata.batchWatermarkMs
+== watermark)
+}
+
+populateNewWatermarkFromData(first, 11)
--- End diff --

if these are always used together, then these functions can be merged .. 
right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139025952
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -560,13 +567,24 @@ class StreamExecution(
 }
 if (hasNewData) {
   var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-  // Update the eventTime watermark if we find one in the plan.
+  // Update the eventTime watermarks if we find any in the plan.
--- End diff --

its still a single watermark that is being updated. it just happens to be 
updated using multiple watermarks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139025814
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -130,6 +130,13 @@ class StreamExecution(
   protected var offsetSeqMetadata = OffsetSeqMetadata(
 batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf)
 
+  /**
+   * A map from watermarked attributes to their current watermark. The 
minimum watermark
+   * timestamp present here will be used as the overall query watermark in 
offsetSeqMetadata;
+   * the query watermark is what's logged and used to age out old state.
+   */
+  protected var attributeWatermarkMsMap: AttributeMap[Long] = 
AttributeMap(Seq())
--- End diff --

this is needed only inside a single function right? so make it a local 
variable. 
even better, if you dont have to make it a var, make it a val in the usual 
functional way.





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19239#discussion_r139025554
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -39,7 +39,7 @@ class IncrementalExecution(
 val checkpointLocation: String,
 val runId: UUID,
 val currentBatchId: Long,
-offsetSeqMetadata: OffsetSeqMetadata)
+private[sql] val offsetSeqMetadata: OffsetSeqMetadata)
--- End diff --

just make it val. Anything inside sql.execution does not show up in docs, 
and therefore we just keep to val so that we can debug when we need to dig deep.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...

2017-09-14 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19239

[SPARK-22017] Take minimum of all watermark execs in StreamExecution.

## What changes were proposed in this pull request?

Take the minimum of all watermark exec nodes as the "real" watermark in 
StreamExecution, rather than picking one arbitrarily.

## How was this patch tested?

new unit test


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-22017

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19239


commit 4a7f53fdab1e5e640e156a4a3d2ba27837195195
Author: Jose Torres 
Date:   2017-09-13T21:49:23Z

Implement multiple watermark StreamExecution support.

commit 9b9cd19106fae9a2de268eb2b559ca1bf159e9c2
Author: Jose Torres 
Date:   2017-09-14T18:30:40Z

partially fix test

commit 6a4c80b696f42a445c7f846fada3f823e04bd3ab
Author: Jose Torres 
Date:   2017-09-14T21:52:16Z

Finish rewriting test




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org