[GitHub] spark pull request #18840: [SPARK-21565] Propagate metadata in attribute rep...

2017-08-07 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/18840#discussion_r131707863
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -391,6 +391,30 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 checkDataset[Long](df, 1L to 100L: _*)
   }
 
+  test("SPARK-21565: watermark operator accepts attributes from 
replacement") {
+withTempDir { dir =>
+  dir.delete()
+
+  val df = Seq(("a", 100.0, new java.sql.Timestamp(100L)))
+.toDF("symbol", "price", "eventTime")
+  df.write.json(dir.getCanonicalPath)
+
+  val input = spark.readStream.schema(df.schema)
+.json(dir.getCanonicalPath)
+
+  val groupEvents = input
+.withWatermark("eventTime", "2 seconds")
+.groupBy("symbol", "eventTime")
+.agg(count("price") as 'count)
+.select("symbol", "eventTime", "count")
+  val q = groupEvents.writeStream
+.outputMode("append")
+.format("console")
+.start()
+  q.processAllAvailable()
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18840: [SPARK-21565] Propagate metadata in attribute rep...

2017-08-03 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18840#discussion_r131309601
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
@@ -391,6 +391,30 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 checkDataset[Long](df, 1L to 100L: _*)
   }
 
+  test("SPARK-21565: watermark operator accepts attributes from 
replacement") {
+withTempDir { dir =>
+  dir.delete()
+
+  val df = Seq(("a", 100.0, new java.sql.Timestamp(100L)))
+.toDF("symbol", "price", "eventTime")
+  df.write.json(dir.getCanonicalPath)
+
+  val input = spark.readStream.schema(df.schema)
+.json(dir.getCanonicalPath)
+
+  val groupEvents = input
+.withWatermark("eventTime", "2 seconds")
+.groupBy("symbol", "eventTime")
+.agg(count("price") as 'count)
+.select("symbol", "eventTime", "count")
+  val q = groupEvents.writeStream
+.outputMode("append")
+.format("console")
+.start()
+  q.processAllAvailable()
--- End diff --

nit: `q.processAllAvailable()` ->
```
try {
  q.processAllAvailable()
} finally {
  q.stop()
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18840: [SPARK-21565] Propagate metadata in attribute rep...

2017-08-03 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/18840

[SPARK-21565] Propagate metadata in attribute replacement.

## What changes were proposed in this pull request?

Propagate metadata in attribute replacement during streaming execution. 
This is necessary for EventTimeWatermarks consuming replaced attributes.

## How was this patch tested?
new unit test, which was verified to fail before the fix

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-21565

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18840.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18840


commit e54d81200569c2260f0995b2f91aa9829dc10ad7
Author: Jose Torres 
Date:   2017-08-04T03:52:57Z

Propagate metadata in attribute replacement.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org