[GitHub] spark pull request #18840: [SPARK-21565] Propagate metadata in attribute rep...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18840#discussion_r131707863 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -391,6 +391,30 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche checkDataset[Long](df, 1L to 100L: _*) } + test("SPARK-21565: watermark operator accepts attributes from replacement") { +withTempDir { dir => + dir.delete() + + val df = Seq(("a", 100.0, new java.sql.Timestamp(100L))) +.toDF("symbol", "price", "eventTime") + df.write.json(dir.getCanonicalPath) + + val input = spark.readStream.schema(df.schema) +.json(dir.getCanonicalPath) + + val groupEvents = input +.withWatermark("eventTime", "2 seconds") +.groupBy("symbol", "eventTime") +.agg(count("price") as 'count) +.select("symbol", "eventTime", "count") + val q = groupEvents.writeStream +.outputMode("append") +.format("console") +.start() + q.processAllAvailable() --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18840: [SPARK-21565] Propagate metadata in attribute rep...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18840#discussion_r131309601 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -391,6 +391,30 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche checkDataset[Long](df, 1L to 100L: _*) } + test("SPARK-21565: watermark operator accepts attributes from replacement") { +withTempDir { dir => + dir.delete() + + val df = Seq(("a", 100.0, new java.sql.Timestamp(100L))) +.toDF("symbol", "price", "eventTime") + df.write.json(dir.getCanonicalPath) + + val input = spark.readStream.schema(df.schema) +.json(dir.getCanonicalPath) + + val groupEvents = input +.withWatermark("eventTime", "2 seconds") +.groupBy("symbol", "eventTime") +.agg(count("price") as 'count) +.select("symbol", "eventTime", "count") + val q = groupEvents.writeStream +.outputMode("append") +.format("console") +.start() + q.processAllAvailable() --- End diff -- nit: `q.processAllAvailable()` -> ``` try { q.processAllAvailable() } finally { q.stop() } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18840: [SPARK-21565] Propagate metadata in attribute rep...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/18840 [SPARK-21565] Propagate metadata in attribute replacement. ## What changes were proposed in this pull request? Propagate metadata in attribute replacement during streaming execution. This is necessary for EventTimeWatermarks consuming replaced attributes. ## How was this patch tested? new unit test, which was verified to fail before the fix You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-21565 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18840.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18840 commit e54d81200569c2260f0995b2f91aa9829dc10ad7 Author: Jose TorresDate: 2017-08-04T03:52:57Z Propagate metadata in attribute replacement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org