[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19239 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139259901 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstDf = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondDf = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "5 seconds") + .select('value) + +val union = firstDf.union(secondDf) + .writeStream + .format("memory") + .queryName("test") + .start() + +def getWatermarkAfterData( +firstData: Seq[Int] = Seq.empty, +secondData: Seq[Int] = Seq.empty): Long = { + if (firstData.nonEmpty) first.addData(firstData) + if (secondData.nonEmpty) second.addData(secondData) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + first.addData(0) + union.processAllAvailable() + // get last watermark + val lastExecution = union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution + lastExecution.offsetSeqMetadata.batchWatermarkMs +} + +// Global watermark starts at 0 until we get data from both sides +assert(getWatermarkAfterData(firstData = Seq(11)) == 0) +assert(getWatermarkAfterData(secondData = Seq(6)) == 1000) +// Global watermark stays at left watermark 1 when right watermark moves to 2 +assert(getWatermarkAfterData(secondData = Seq(8)) == 1000) +// Global watermark switches to right side value 2 when left watermark goes higher +assert(getWatermarkAfterData(firstData = Seq(21)) == 3000) +// Global watermark goes back to left +assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000) +// Global watermark stays on left as long as it's below right +assert(getWatermarkAfterData(firstData = Seq(31)) == 21000) +assert(getWatermarkAfterData(firstData = Seq(41)) == 31000) +// Global watermark switches back to right again +assert(getWatermarkAfterData(firstData = Seq(51)) == 34000) + +// Global watermark is updated correctly with simultaneous data from both sides +assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 9) +assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000) +assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 12) + +// Global watermark doesn't decrement with simultaneous data +assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 12) +assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 12) +assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 13) --- End diff -- test recovery of the minimum after a restart. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
GitHub user joseph-torres reopened a pull request: https://github.com/apache/spark/pull/19239 [SPARK-22017] Take minimum of all watermark execs in StreamExecution. ## What changes were proposed in this pull request? Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily. ## How was this patch tested? new unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-22017 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19239.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19239 commit 4a7f53fdab1e5e640e156a4a3d2ba27837195195 Author: Jose TorresDate: 2017-09-13T21:49:23Z Implement multiple watermark StreamExecution support. commit 9b9cd19106fae9a2de268eb2b559ca1bf159e9c2 Author: Jose Torres Date: 2017-09-14T18:30:40Z partially fix test commit 6a4c80b696f42a445c7f846fada3f823e04bd3ab Author: Jose Torres Date: 2017-09-14T21:52:16Z Finish rewriting test commit 484940e5eb4d1eac1c5ec81f475681c9241bbab2 Author: Jose Torres Date: 2017-09-14T22:24:36Z make IncrementalExecution.offsetSeqMetadata non-private commit 032f55503c8d424390da1ff85054e3a01e7489eb Author: Jose Torres Date: 2017-09-14T23:22:22Z properly name test dataframes commit d7f5f60c6be5bf228c960c3549eb81ed869f0227 Author: Jose Torres Date: 2017-09-14T23:39:22Z Combine test helper functions. commit 2f07f90423d87985322975f8ad5aef8f70f28066 Author: Jose Torres Date: 2017-09-15T01:18:12Z Key watermarks by relative position rather than attribute. commit 8b605384d77fdeb63b28feabee74284a5ab1409a Author: Jose Torres Date: 2017-09-15T02:05:14Z Address test comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user joseph-torres closed the pull request at: https://github.com/apache/spark/pull/19239 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139077988 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,85 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstDf = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondDf = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "5 seconds") + .select('value) + +val union = firstDf.union(secondDf) + .writeStream + .format("memory") + .queryName("test") + .start() + +def generateAndAssertNewWatermark( +stream: MemoryStream[Int], +data: Seq[Int], +watermark: Int): Unit = { + stream.addData(data) + assertWatermark(watermark) +} + +def assertWatermark(watermark: Int) { + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + first.addData(0) + union.processAllAvailable() + + val lastExecution = union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution + assert(lastExecution.offsetSeqMetadata.batchWatermarkMs == watermark) +} + +generateAndAssertNewWatermark(first, Seq(11), 1000) --- End diff -- BTW, why is the first watermark at 1000? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139077656 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,85 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstDf = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondDf = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "5 seconds") + .select('value) + +val union = firstDf.union(secondDf) + .writeStream + .format("memory") + .queryName("test") + .start() + +def generateAndAssertNewWatermark( +stream: MemoryStream[Int], +data: Seq[Int], +watermark: Int): Unit = { + stream.addData(data) + assertWatermark(watermark) +} + +def assertWatermark(watermark: Int) { + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + first.addData(0) + union.processAllAvailable() + + val lastExecution = union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution + assert(lastExecution.offsetSeqMetadata.batchWatermarkMs == watermark) +} + +generateAndAssertNewWatermark(first, Seq(11), 1000) --- End diff -- instead of having two variations for handing single input and double inputs, you can do something like this. ``` def getWatermarkAfterData(firstData: Seq[Int] = Seq.empty, secondData: Seq[Int] = Seq.empty): Long = { if (firstData.nonEmpty) first.add(firstData) if (secondData.nonEmpty) second.add(secondData) union.processAllAvailable() // add a dummy batch so lastExecution has the new watermark first.addData(0) union.processAllAvailable() // get updated watermark val lastExecution = union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution lastExecution.offsetSeqMetadata.batchWatermarkMs } assert(getWatermarkAfterData(firstData = Seq(...) === 1) assert(getWatermarkAfterData(secondData = Seq(...) === 1) assert(getWatermarkAfterData(firstData = Seq(...), secondData = Seq(...) === 1) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139076625 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -560,13 +567,25 @@ class StreamExecution( } if (hasNewData) { var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermark if we find one in the plan. + // Update the eventTime watermarks if we find any in the plan. if (lastExecution != null) { lastExecution.executedPlan.collect { - case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => + case e: EventTimeWatermarkExec => e +}.zipWithIndex.foreach { + case (e, index) if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") -e.eventTimeStats.value.max - e.delayMs -}.headOption.foreach { newWatermarkMs => +val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs +val mappedWatermarkMs: Option[Long] = watermarkMsMap.get(index) +if (mappedWatermarkMs.isEmpty || newAttributeWatermarkMs > mappedWatermarkMs.get) { + watermarkMsMap.put(index, newAttributeWatermarkMs) +} + + case _ => +} + +// Update the query watermark to the minimum of all attribute watermarks. --- End diff -- Add a little bit more docs saying that this is the safe thing to do as watermark guarantees . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139070352 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -560,13 +567,25 @@ class StreamExecution( } if (hasNewData) { var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermark if we find one in the plan. + // Update the eventTime watermarks if we find any in the plan. if (lastExecution != null) { lastExecution.executedPlan.collect { - case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => + case e: EventTimeWatermarkExec => e +}.zipWithIndex.foreach { + case (e, index) if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") -e.eventTimeStats.value.max - e.delayMs -}.headOption.foreach { newWatermarkMs => +val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs +val mappedWatermarkMs: Option[Long] = watermarkMsMap.get(index) --- End diff -- mappedWatermarkMs -> previousWatermarkMs more semantically meaningful. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139070245 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -560,13 +567,25 @@ class StreamExecution( } if (hasNewData) { var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermark if we find one in the plan. + // Update the eventTime watermarks if we find any in the plan. if (lastExecution != null) { lastExecution.executedPlan.collect { - case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => + case e: EventTimeWatermarkExec => e +}.zipWithIndex.foreach { + case (e, index) if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") -e.eventTimeStats.value.max - e.delayMs -}.headOption.foreach { newWatermarkMs => +val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs --- End diff -- also... newAttributeWatermarkMs -> newWatermarkMs why "attribute"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139070184 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -560,13 +567,25 @@ class StreamExecution( } if (hasNewData) { var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermark if we find one in the plan. + // Update the eventTime watermarks if we find any in the plan. if (lastExecution != null) { lastExecution.executedPlan.collect { - case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => + case e: EventTimeWatermarkExec => e +}.zipWithIndex.foreach { + case (e, index) if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") -e.eventTimeStats.value.max - e.delayMs -}.headOption.foreach { newWatermarkMs => +val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs +val mappedWatermarkMs: Option[Long] = watermarkMsMap.get(index) --- End diff -- nit: Option[Long] is not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139070141 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -560,13 +567,25 @@ class StreamExecution( } if (hasNewData) { var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermark if we find one in the plan. + // Update the eventTime watermarks if we find any in the plan. if (lastExecution != null) { lastExecution.executedPlan.collect { - case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => + case e: EventTimeWatermarkExec => e +}.zipWithIndex.foreach { + case (e, index) if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") -e.eventTimeStats.value.max - e.delayMs -}.headOption.foreach { newWatermarkMs => +val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs --- End diff -- Add the e or index in the log debug to differentiate between different operators. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139069900 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -560,13 +567,24 @@ class StreamExecution( } if (hasNewData) { var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermark if we find one in the plan. + // Update the eventTime watermarks if we find any in the plan. --- End diff -- Yeah. I had made this comment when i was thinking that we dont need the mutable map. Ignore this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139069662 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -130,6 +130,13 @@ class StreamExecution( protected var offsetSeqMetadata = OffsetSeqMetadata( batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf) + /** + * A map of current watermarks, keyed by the position of the watermark operator in the + * physical plan. The minimum watermark timestamp present here will be used and persisted as the + * query's watermark when preparing each batch, so it's ok that this val isn't fault-tolerant. --- End diff -- Make it clear that "this state is soft state and does not require persistence, because the minimum . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139036524 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondAggregation = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val union = firstAggregation.union(secondAggregation) + .writeStream + .format("memory") + .queryName("test") + .start() + +def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + stream.addData(data) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + stream.addData(0) + union.processAllAvailable() +} + +def assertQueryWatermark(watermark: Int): Unit = { + assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery +.lastExecution.offsetSeqMetadata.batchWatermarkMs +== watermark) +} + +populateNewWatermarkFromData(first, 11) +assertQueryWatermark(1000) + +// Watermark stays at 1 from the left when right watermark moves to 2 --- End diff -- nit: Watermark to "global watermark" .. to differentiate from "right watermark" later in the sentence. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139036217 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondAggregation = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val union = firstAggregation.union(secondAggregation) + .writeStream + .format("memory") + .queryName("test") + .start() + +def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + stream.addData(data) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + stream.addData(0) + union.processAllAvailable() +} + +def assertQueryWatermark(watermark: Int): Unit = { + assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery +.lastExecution.offsetSeqMetadata.batchWatermarkMs +== watermark) +} + +populateNewWatermarkFromData(first, 11) --- End diff -- You are right. what you have is better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139036315 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondAggregation = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") --- End diff -- Can you update the test to have different watermark delays, so that we test that we are choosing min delay, but the min watermark? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139035990 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondAggregation = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val union = firstAggregation.union(secondAggregation) + .writeStream + .format("memory") + .queryName("test") + .start() + +def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + stream.addData(data) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + stream.addData(0) + union.processAllAvailable() +} + +def assertQueryWatermark(watermark: Int): Unit = { + assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery --- End diff -- this line break is hard to read. how about you break it with an intermediate variable (e.g. `val lastExecution = ... ; assert(...) ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139035867 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondAggregation = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val union = firstAggregation.union(secondAggregation) + .writeStream + .format("memory") + .queryName("test") + .start() + +def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + stream.addData(data) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + stream.addData(0) + union.processAllAvailable() +} + +def assertQueryWatermark(watermark: Int): Unit = { --- End diff -- nit: assertWatermark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139034057 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -130,6 +130,13 @@ class StreamExecution( protected var offsetSeqMetadata = OffsetSeqMetadata( batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf) + /** + * A map from watermarked attributes to their current watermark. The minimum watermark + * timestamp present here will be used as the overall query watermark in offsetSeqMetadata; + * the query watermark is what's logged and used to age out old state. + */ + protected var attributeWatermarkMsMap: AttributeMap[Long] = AttributeMap(Seq()) --- End diff -- I see. I was mistaken. However in that case, use a mutable.HashMap instead of var. That the code style we use with scala is not to use vars unless absolutely needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139033482 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondAggregation = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val union = firstAggregation.union(secondAggregation) + .writeStream + .format("memory") + .queryName("test") + .start() + +def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + stream.addData(data) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + stream.addData(0) + union.processAllAvailable() +} + +def assertQueryWatermark(watermark: Int): Unit = { + assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery +.lastExecution.offsetSeqMetadata.batchWatermarkMs +== watermark) +} + +populateNewWatermarkFromData(first, 11) --- End diff -- even better example - https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L107 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139033208 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondAggregation = second.toDF() --- End diff -- there is no aggregation here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139029401 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -560,13 +567,24 @@ class StreamExecution( } if (hasNewData) { var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermark if we find one in the plan. + // Update the eventTime watermarks if we find any in the plan. --- End diff -- Well, we're updating multiple watermarks in the map. We later update `offsetSeqMetadata` with the new minimum one, but that's not in this block. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139029187 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -130,6 +130,13 @@ class StreamExecution( protected var offsetSeqMetadata = OffsetSeqMetadata( batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf) + /** + * A map from watermarked attributes to their current watermark. The minimum watermark + * timestamp present here will be used as the overall query watermark in offsetSeqMetadata; + * the query watermark is what's logged and used to age out old state. + */ + protected var attributeWatermarkMsMap: AttributeMap[Long] = AttributeMap(Seq()) --- End diff -- This map has to persist and get updated across batches, and I'm not sure how to do that with a local variable or a val. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139028613 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondAggregation = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val union = firstAggregation.union(secondAggregation) + .writeStream + .format("memory") + .queryName("test") + .start() + +def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + stream.addData(data) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + stream.addData(0) + union.processAllAvailable() +} + +def assertQueryWatermark(watermark: Int): Unit = { + assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery +.lastExecution.offsetSeqMetadata.batchWatermarkMs +== watermark) +} + +populateNewWatermarkFromData(first, 11) --- End diff -- The problem is that watermark recalculation happens at the beginning of each batch, and to sequence executions I have to call CheckData or CheckLastBatch. So that method ends up producing a test multiple times longer, since a single entry is: AddData(realData) CheckLastBatch AddData(0) CheckLastBatch AssertOnQuery --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139026379 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondAggregation = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val union = firstAggregation.union(secondAggregation) + .writeStream + .format("memory") + .queryName("test") + .start() + +def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + stream.addData(data) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + stream.addData(0) + union.processAllAvailable() +} + +def assertQueryWatermark(watermark: Int): Unit = { + assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery +.lastExecution.offsetSeqMetadata.batchWatermarkMs +== watermark) +} + +populateNewWatermarkFromData(first, 11) --- End diff -- Also, I think you can use the `testStream..AddData... AssertOnQuery` pattern. its cleaner. https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala#L180 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139026066 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondAggregation = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val union = firstAggregation.union(secondAggregation) + .writeStream + .format("memory") + .queryName("test") + .start() + +def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + stream.addData(data) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + stream.addData(0) + union.processAllAvailable() +} + +def assertQueryWatermark(watermark: Int): Unit = { + assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery +.lastExecution.offsetSeqMetadata.batchWatermarkMs +== watermark) +} + +populateNewWatermarkFromData(first, 11) --- End diff -- if these are always used together, then these functions can be merged .. right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139025952 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -560,13 +567,24 @@ class StreamExecution( } if (hasNewData) { var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermark if we find one in the plan. + // Update the eventTime watermarks if we find any in the plan. --- End diff -- its still a single watermark that is being updated. it just happens to be updated using multiple watermarks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139025814 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -130,6 +130,13 @@ class StreamExecution( protected var offsetSeqMetadata = OffsetSeqMetadata( batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf) + /** + * A map from watermarked attributes to their current watermark. The minimum watermark + * timestamp present here will be used as the overall query watermark in offsetSeqMetadata; + * the query watermark is what's logged and used to age out old state. + */ + protected var attributeWatermarkMsMap: AttributeMap[Long] = AttributeMap(Seq()) --- End diff -- this is needed only inside a single function right? so make it a local variable. even better, if you dont have to make it a var, make it a val in the usual functional way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139025554 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -39,7 +39,7 @@ class IncrementalExecution( val checkpointLocation: String, val runId: UUID, val currentBatchId: Long, -offsetSeqMetadata: OffsetSeqMetadata) +private[sql] val offsetSeqMetadata: OffsetSeqMetadata) --- End diff -- just make it val. Anything inside sql.execution does not show up in docs, and therefore we just keep to val so that we can debug when we need to dig deep. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19239 [SPARK-22017] Take minimum of all watermark execs in StreamExecution. ## What changes were proposed in this pull request? Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily. ## How was this patch tested? new unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-22017 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19239.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19239 commit 4a7f53fdab1e5e640e156a4a3d2ba27837195195 Author: Jose TorresDate: 2017-09-13T21:49:23Z Implement multiple watermark StreamExecution support. commit 9b9cd19106fae9a2de268eb2b559ca1bf159e9c2 Author: Jose Torres Date: 2017-09-14T18:30:40Z partially fix test commit 6a4c80b696f42a445c7f846fada3f823e04bd3ab Author: Jose Torres Date: 2017-09-14T21:52:16Z Finish rewriting test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org