[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15949


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89894205
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
--- End diff --

I just tried to write a det test using manual lock:
```
  test("metadata is recovered from log when query is restarted") {
import testImplicits._
val clock = new StreamManualClock()
val inputData = MemoryStream[Long]
val df = inputData.toDF()
  .select(current_timestamp().cast("long").as[Long])
testStream(df)(
  StartStream(trigger = ProcessingTime("1 second"), triggerClock = 
clock),
  AssertOnQuery { _ =>
// Make sure the clock is waiting. Otherwise, the batch time may be 
0 or 1.
eventually(timeout(streamingTimeout)) {
  assert(clock.isStreamWaitingAt(0L))
}
true
  },
  AddData(inputData, 1L), // Trigger one output row
  AdvanceManualClock(1000L),
  CheckLastBatch(1L),
  AddData(inputData, 1L), // Trigger one output row
  AdvanceManualClock(1000L),
  CheckLastBatch(2L),
  StopStream,
  AssertOnQuery { q => // clear the sink
q.sink.asInstanceOf[MemorySink].clear()
true
  },
  StartStream(trigger = ProcessingTime("1 second"), triggerClock = 
clock),
  CheckLastBatch(2L),
  AddData(inputData, 1L), // Trigger one output row
  AdvanceManualClock(1000L),
  CheckLastBatch(3L)
)
  }
```
You can use it to replace yours.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89873586
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
+val t1 = clock.getTimeMillis() + 60L * 1000L
+val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L
+val q = startQuery
+ms.addData(t1, t2)
+q.processAllAvailable()
+
+checkAnswer(
+  spark.table(tableName),
+  Seq(Row(t1, 1), Row(t2, 1))
+)
+
+q.stop()
+Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2
--- End diff --

I should also say that I'm not too concerned by the indeterministic system 
clock issues since the batchTimestamp is recorded prior to running the query. 
Therefore as long as the query gets planned within 10 seconds, we're good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89861730
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
+val t1 = clock.getTimeMillis() + 60L * 1000L
+val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L
+val q = startQuery
+ms.addData(t1, t2)
+q.processAllAvailable()
+
+checkAnswer(
+  spark.table(tableName),
+  Seq(Row(t1, 1), Row(t2, 1))
+)
+
+q.stop()
+Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2
--- End diff --

Actually we can remove this test completely. Rather the existing 
current_time and current_date tests can be extended to test recovery. No point 
having all this extra code and thread.sleeps thats going to be flaky. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89859904
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala ---
@@ -96,27 +96,42 @@ class WatermarkSuite extends StreamTest with 
BeforeAndAfter with Logging {
 )
   }
 
-  ignore("recovery") {
+  test("recovery") {
 val inputData = MemoryStream[Int]
-
-val windowedAggregation = inputData.toDF()
-.withColumn("eventTime", $"value".cast("timestamp"))
-.withWatermark("eventTime", "10 seconds")
-.groupBy(window($"eventTime", "5 seconds") as 'window)
-.agg(count("*") as 'count)
-.select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
-
-testStream(windowedAggregation)(
+val df = inputData.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .groupBy(window($"eventTime", "5 seconds") as 'window)
+  .agg(count("*") as 'count)
+  .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+val outputMode = OutputMode.Append
--- End diff --

`outputMode` and `memorySink` are not used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89856411
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
+val t1 = clock.getTimeMillis() + 60L * 1000L
+val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L
+val q = startQuery
+ms.addData(t1, t2)
+q.processAllAvailable()
+
+checkAnswer(
+  spark.table(tableName),
+  Seq(Row(t1, 1), Row(t2, 1))
+)
+
+q.stop()
+Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2
--- End diff --

@zsxwing I don't see an obvious way to pass a StreamManualClock in 
DataStreamWriter.start(). Should I be taking an entirely different approach?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89851054
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
+val t1 = clock.getTimeMillis() + 60L * 1000L
+val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L
+val q = startQuery
+ms.addData(t1, t2)
+q.processAllAvailable()
+
+checkAnswer(
+  spark.table(tableName),
+  Seq(Row(t1, 1), Row(t2, 1))
+)
+
+q.stop()
+Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2
--- End diff --

Why not use `StreamManualClock` to avoid the indeterministic system clock.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89739781
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -235,4 +239,85 @@ class StreamingAggregationSuite extends StreamTest 
with BeforeAndAfterAll {
   CheckLastBatch(("a", 30), ("b", 3), ("c", 1))
 )
   }
+
+  test("prune results by current_time, complete mode") {
+import testImplicits._
+import StreamingAggregationSuite._
+clock = new StreamManualClock
+
+val inputData = MemoryStream[Long]
+
+val aggregated =
+  inputData.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.where('value >= current_timestamp().cast("long") - 10L)
+
+testStream(aggregated, Complete)(
+  StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+
+  // advance clock to 10 seconds
+  AddData(inputData, 0L, 5L, 5L, 10L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+
+  // advance clock to 20 seconds, should retain keys >= 10
+  AddData(inputData, 15L, 15L, 20L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
+
+  // advance clock to 30 seconds, should retain keys >= 20
+  AddData(inputData, 0L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((20L, 1)),
+
+  // advance clock to 40 seconds, should retain keys >= 30
+  AddData(inputData, 25L, 30L, 40L, 45L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((30L, 1), (40L, 1), (45L, 1))
+)
+  }
+
+
+  test("prune results by current_date, complete mode") {
+import testImplicits._
+import StreamingAggregationSuite._
+clock = new StreamManualClock
+val tz = TimeZone.getDefault.getID
+val inputData = MemoryStream[Long]
+val aggregated =
+  inputData.toDF()
+.select(to_utc_timestamp(from_unixtime('value * 
DateTimeUtils.SECONDS_PER_DAY), tz))
+.toDF("value")
+.groupBy($"value")
+.agg(count("*"))
+// .select('value, date_sub(current_date(), 
10).cast("timestamp").alias("t"))
+// .select('value, 't, 'value >= 't)
--- End diff --

please remove these lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89745152
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
+val t1 = clock.getTimeMillis() + 60L * 1000L
+val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L
+val q = startQuery
+ms.addData(t1, t2)
+q.processAllAvailable()
+
+checkAnswer(
+  spark.table(tableName),
+  Seq(Row(t1, 1), Row(t2, 1))
+)
+
+q.stop()
+Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2
+assert(t1 < clock.getTimeMillis())
+assert(t2 < clock.getTimeMillis())
+
+spark.sql(s"drop table $tableName")
+
+// verify table is dropped
+intercept[AnalysisException](spark.table(tableName).collect())
+val q2 = startQuery
+q2.processAllAvailable()
+checkAnswer(
+  spark.table(tableName),
+  Seq(Row(t1, 1), Row(t2, 1))
+)
+
+q2.stop()
+
--- End diff --

nit: extra line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89739983
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala ---
@@ -96,27 +96,42 @@ class WatermarkSuite extends StreamTest with 
BeforeAndAfter with Logging {
 )
   }
 
-  ignore("recovery") {
+  test("recovery") {
 val inputData = MemoryStream[Int]
-
-val windowedAggregation = inputData.toDF()
-.withColumn("eventTime", $"value".cast("timestamp"))
-.withWatermark("eventTime", "10 seconds")
-.groupBy(window($"eventTime", "5 seconds") as 'window)
-.agg(count("*") as 'count)
-.select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
-
-testStream(windowedAggregation)(
+val df = inputData.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .groupBy(window($"eventTime", "5 seconds") as 'window)
+  .agg(count("*") as 'count)
+  .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+val outputMode = OutputMode.Append
+val memorySink = new MemorySink(df.schema, outputMode)
+testStream(df)(
   AddData(inputData, 10, 11, 12, 13, 14, 15),
   CheckAnswer(),
--- End diff --

nit: Make this CheckAnswer -> CheckLastBatch for being consistent with rest 
of the checks in this test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89746706
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
--- End diff --

nit: functions that have sideeffects (like starting a thread), usually have 
`()` at the end, and is used with the `()`. for example, `val q = startQuery()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89739836
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -235,4 +239,85 @@ class StreamingAggregationSuite extends StreamTest 
with BeforeAndAfterAll {
   CheckLastBatch(("a", 30), ("b", 3), ("c", 1))
 )
   }
+
+  test("prune results by current_time, complete mode") {
+import testImplicits._
+import StreamingAggregationSuite._
+clock = new StreamManualClock
+
+val inputData = MemoryStream[Long]
+
+val aggregated =
+  inputData.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.where('value >= current_timestamp().cast("long") - 10L)
+
+testStream(aggregated, Complete)(
+  StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+
+  // advance clock to 10 seconds
+  AddData(inputData, 0L, 5L, 5L, 10L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+
+  // advance clock to 20 seconds, should retain keys >= 10
+  AddData(inputData, 15L, 15L, 20L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
+
+  // advance clock to 30 seconds, should retain keys >= 20
+  AddData(inputData, 0L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((20L, 1)),
+
+  // advance clock to 40 seconds, should retain keys >= 30
+  AddData(inputData, 25L, 30L, 40L, 45L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((30L, 1), (40L, 1), (45L, 1))
+)
+  }
+
+
+  test("prune results by current_date, complete mode") {
+import testImplicits._
+import StreamingAggregationSuite._
+clock = new StreamManualClock
+val tz = TimeZone.getDefault.getID
+val inputData = MemoryStream[Long]
+val aggregated =
+  inputData.toDF()
+.select(to_utc_timestamp(from_unixtime('value * 
DateTimeUtils.SECONDS_PER_DAY), tz))
+.toDF("value")
+.groupBy($"value")
+.agg(count("*"))
+// .select('value, date_sub(current_date(), 
10).cast("timestamp").alias("t"))
+// .select('value, 't, 'value >= 't)
+.where($"value".cast("date") >= date_sub(current_date(), 10))
+.select(($"value".cast("long") / 
DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
+testStream(aggregated, Complete)(
+  StartStream(ProcessingTime("10 day"), triggerClock = clock),
+  // advance clock to 10 days, should retain all keys
+  AddData(inputData, 0L, 5L, 5L, 10L),
+  AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+  CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+  // advance clock to 20 days, should retain keys >= 10
+  AddData(inputData, 15L, 15L, 20L),
+  AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+  CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
+  // advance clock to 30 days, should retain keys >= 20
+  AddData(inputData, 0L),
+  AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+  CheckLastBatch((20L, 1)),
+  // advance clock to 40 seconds, should retain keys >= 30
--- End diff --

40 seconds -> days


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89747045
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
+val t1 = clock.getTimeMillis() + 60L * 1000L
+val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L
--- End diff --

add a comment explaining how the test works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89744923
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala ---
@@ -96,27 +96,42 @@ class WatermarkSuite extends StreamTest with 
BeforeAndAfter with Logging {
 )
   }
 
-  ignore("recovery") {
+  test("recovery") {
 val inputData = MemoryStream[Int]
-
-val windowedAggregation = inputData.toDF()
-.withColumn("eventTime", $"value".cast("timestamp"))
-.withWatermark("eventTime", "10 seconds")
-.groupBy(window($"eventTime", "5 seconds") as 'window)
-.agg(count("*") as 'count)
-.select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
-
-testStream(windowedAggregation)(
+val df = inputData.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .groupBy(window($"eventTime", "5 seconds") as 'window)
+  .agg(count("*") as 'count)
+  .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+val outputMode = OutputMode.Append
+val memorySink = new MemorySink(df.schema, outputMode)
+testStream(df)(
   AddData(inputData, 10, 11, 12, 13, 14, 15),
   CheckAnswer(),
   AddData(inputData, 25), // Advance watermark to 15 seconds
   StopStream,
   StartStream(),
-  CheckAnswer(),
+  CheckLastBatch(),
   AddData(inputData, 25), // Evict items less than previous watermark.
+  CheckLastBatch((10, 5)),
   StopStream,
+  AssertOnQuery { q => // clear the sink
+q.sink.asInstanceOf[MemorySink].clear()
+true
+  },
   StartStream(),
-  CheckAnswer((10, 5))
+  CheckLastBatch((10, 5)),
--- End diff --

nit: add comment to explain  // Should recompute the last batch and 
re-evict timestamp 10


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89748128
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
+val t1 = clock.getTimeMillis() + 60L * 1000L
+val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L
+val q = startQuery
+ms.addData(t1, t2)
+q.processAllAvailable()
+
+checkAnswer(
+  spark.table(tableName),
+  Seq(Row(t1, 1), Row(t2, 1))
+)
+
+q.stop()
+Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2
+assert(t1 < clock.getTimeMillis())
+assert(t2 < clock.getTimeMillis())
+
+spark.sql(s"drop table $tableName")
+
+// verify table is dropped
+intercept[AnalysisException](spark.table(tableName).collect())
--- End diff --

i think you can use `spark.catalog,tableExists`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89747718
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
+val t1 = clock.getTimeMillis() + 60L * 1000L
+val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L
+val q = startQuery
+ms.addData(t1, t2)
+q.processAllAvailable()
+
+checkAnswer(
+  spark.table(tableName),
+  Seq(Row(t1, 1), Row(t2, 1))
+)
+
+q.stop()
+Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2
--- End diff --

This test will now takes 60 seconds! I think I didnt quite understand the 
test earlier, but now I do. I think the earlier 5 second was closer to being 
fine. Okay, lets just use 10 seconds. And instead of sleep, use `eventually` to 
check the conditions `t2 < clock.getTimeMillis()`. this would make the system 
sleep no more than that is necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89745095
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -235,4 +239,85 @@ class StreamingAggregationSuite extends StreamTest 
with BeforeAndAfterAll {
   CheckLastBatch(("a", 30), ("b", 3), ("c", 1))
 )
   }
+
+  test("prune results by current_time, complete mode") {
+import testImplicits._
+import StreamingAggregationSuite._
+clock = new StreamManualClock
+
+val inputData = MemoryStream[Long]
+
+val aggregated =
+  inputData.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.where('value >= current_timestamp().cast("long") - 10L)
+
+testStream(aggregated, Complete)(
+  StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+
+  // advance clock to 10 seconds
+  AddData(inputData, 0L, 5L, 5L, 10L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+
+  // advance clock to 20 seconds, should retain keys >= 10
+  AddData(inputData, 15L, 15L, 20L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
+
+  // advance clock to 30 seconds, should retain keys >= 20
+  AddData(inputData, 0L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((20L, 1)),
+
+  // advance clock to 40 seconds, should retain keys >= 30
+  AddData(inputData, 25L, 30L, 40L, 45L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((30L, 1), (40L, 1), (45L, 1))
+)
+  }
+
--- End diff --

nit: extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89746924
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
--- End diff --

what does this comment mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89424696
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala ---
@@ -96,28 +96,58 @@ class WatermarkSuite extends StreamTest with 
BeforeAndAfter with Logging {
 )
   }
 
-  ignore("recovery") {
-val inputData = MemoryStream[Int]
-
-val windowedAggregation = inputData.toDF()
+  test("recovery") {
+val ms = new MemoryStream[Int](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val tableName = "recovery"
+def startQuery: StreamingQuery = {
+  ms.toDF()
 .withColumn("eventTime", $"value".cast("timestamp"))
 .withWatermark("eventTime", "10 seconds")
 .groupBy(window($"eventTime", "5 seconds") as 'window)
 .agg(count("*") as 'count)
 .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+.writeStream
+.format("memory")
+.queryName(tableName)
+.outputMode("append")
+.start()
+}
 
-testStream(windowedAggregation)(
-  AddData(inputData, 10, 11, 12, 13, 14, 15),
-  CheckAnswer(),
-  AddData(inputData, 25), // Advance watermark to 15 seconds
-  StopStream,
-  StartStream(),
-  CheckAnswer(),
-  AddData(inputData, 25), // Evict items less than previous watermark.
-  StopStream,
-  StartStream(),
-  CheckAnswer((10, 5))
+var q = startQuery
+ms.addData(10, 11, 12, 13, 14, 15)
+q.processAllAvailable()
+
+checkAnswer(
+  spark.table(tableName), Seq()
+)
+
+// Advance watermark to 15 seconds,
+// but do not process batch
+ms.addData(25)
+q.stop()
--- End diff --

why dont you want to process the batch? 
let it process the batch, check whether the results are correct (i.e. 
things were evicted, and then stop. 
drop the table, and restart, processAllAvailable, and restart whether the 
same result is recreated.
this will then actually verify that watermark is recovered, and used to 
evict the records again.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89424606
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala ---
@@ -96,28 +96,58 @@ class WatermarkSuite extends StreamTest with 
BeforeAndAfter with Logging {
 )
   }
 
-  ignore("recovery") {
-val inputData = MemoryStream[Int]
-
-val windowedAggregation = inputData.toDF()
+  test("recovery") {
+val ms = new MemoryStream[Int](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val tableName = "recovery"
+def startQuery: StreamingQuery = {
+  ms.toDF()
 .withColumn("eventTime", $"value".cast("timestamp"))
 .withWatermark("eventTime", "10 seconds")
 .groupBy(window($"eventTime", "5 seconds") as 'window)
 .agg(count("*") as 'count)
 .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+.writeStream
+.format("memory")
+.queryName(tableName)
+.outputMode("append")
+.start()
+}
 
-testStream(windowedAggregation)(
-  AddData(inputData, 10, 11, 12, 13, 14, 15),
-  CheckAnswer(),
-  AddData(inputData, 25), // Advance watermark to 15 seconds
-  StopStream,
-  StartStream(),
-  CheckAnswer(),
-  AddData(inputData, 25), // Evict items less than previous watermark.
-  StopStream,
-  StartStream(),
-  CheckAnswer((10, 5))
+var q = startQuery
+ms.addData(10, 11, 12, 13, 14, 15)
+q.processAllAvailable()
+
+checkAnswer(
+  spark.table(tableName), Seq()
+)
+
+// Advance watermark to 15 seconds,
+// but do not process batch
+ms.addData(25)
+q.stop()
+
+q = startQuery
+q.processAllAvailable()
+checkAnswer(
+  spark.table(tableName), Seq()
+)
+
+// Evict items less than previous watermark
+ms.addData(25)
+q.processAllAvailable()
+checkAnswer(
--- End diff --

why split into 3 lines?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89424405
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala ---
@@ -96,28 +96,58 @@ class WatermarkSuite extends StreamTest with 
BeforeAndAfter with Logging {
 )
   }
 
-  ignore("recovery") {
-val inputData = MemoryStream[Int]
-
-val windowedAggregation = inputData.toDF()
+  test("recovery") {
+val ms = new MemoryStream[Int](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val tableName = "recovery"
+def startQuery: StreamingQuery = {
+  ms.toDF()
 .withColumn("eventTime", $"value".cast("timestamp"))
 .withWatermark("eventTime", "10 seconds")
 .groupBy(window($"eventTime", "5 seconds") as 'window)
 .agg(count("*") as 'count)
 .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+.writeStream
+.format("memory")
+.queryName(tableName)
--- End diff --

i think this is not testing it correctly! you are not even setting 
checkpoint directory. so offset log is not recovered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89403192
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.streaming.StreamExecutionMetadata
+
+class StreamExecutionMetadataSuite extends SparkFunSuite {
+
+  test("stream execution metadata") {
--- End diff --

nit: name "stream execution metadata" does not convey anything. maybe "json 
parsing"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89414485
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("ensure consistent results across batch executions") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+def startQuery: StreamingQuery = {
--- End diff --

add some docs saying why this query was chosen


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89408184
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -235,4 +237,86 @@ class StreamingAggregationSuite extends StreamTest 
with BeforeAndAfterAll {
   CheckLastBatch(("a", 30), ("b", 3), ("c", 1))
 )
   }
+
+  test("prune results by current_time, complete mode") {
+import testImplicits._
+import StreamingAggregationSuite._
+clock = new StreamManualClock
+
+val inputData = MemoryStream[Long]
+
+val aggregated =
+  inputData.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.as[(Long, Long)]
--- End diff --

`as` not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89403372
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -235,4 +237,86 @@ class StreamingAggregationSuite extends StreamTest 
with BeforeAndAfterAll {
   CheckLastBatch(("a", 30), ("b", 3), ("c", 1))
 )
   }
+
+  test("prune results by current_time, complete mode") {
+import testImplicits._
+import StreamingAggregationSuite._
+clock = new StreamManualClock
+
+val inputData = MemoryStream[Long]
+
+val aggregated =
+  inputData.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.as[(Long, Long)]
+.where('value >= current_timestamp().cast("long") - 10L)
+
+testStream(aggregated, Complete)(
+  StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+
+  // advance clock to 10 seconds
+  AddData(inputData, 0L, 5L, 5L, 10L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+
+  // advance clock to 20 seconds, should retain keys >= 10
+  AddData(inputData, 15L, 15L, 20L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
+
+  // advance clock to 30 seconds, should retain keys >= 20
+  AddData(inputData, 0L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((20L, 1)),
+
+  // advance clock to 40 seconds, should retain keys >= 30
+  AddData(inputData, 25L, 30L, 40L, 45L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((30L, 1), (40L, 1), (45L, 1))
+)
+  }
+
+  test("prune results by date_time, complete mode") {
--- End diff --

nit: date_time -> current_date


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89415062
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("ensure consistent results across batch executions") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
+val t1 = clock.getTimeMillis() + 5000L
--- End diff --

increase the time added. this assumes that the query will be done before 5 
to 10 seconds. that may not be true. probably add a minute.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89414600
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("ensure consistent results across batch executions") {
--- End diff --

can you be more specific about what this test. e.g. "metadata is recovered 
from log when query is restarted"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89239675
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -422,6 +450,12 @@ class StreamExecution(
 val replacementMap = AttributeMap(replacements)
 val triggerLogicalPlan = withNewSources transformAllExpressions {
   case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+  case ct: CurrentTimestamp =>
+
CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestampMillis,
+  ct.dataType)
+  case cd: CurrentDate =>
+
CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestampMillis,
--- End diff --

NVM I MISSED THE conversion inside `CurrentBatchTimeStamp.toLiteral`. 
MY BAD!!

But then there are not tests for current_date! So one of the branch in the 
above code is not being tested at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89237005
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -422,6 +450,12 @@ class StreamExecution(
 val replacementMap = AttributeMap(replacements)
 val triggerLogicalPlan = withNewSources transformAllExpressions {
   case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+  case ct: CurrentTimestamp =>
+
CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestampMillis,
+  ct.dataType)
+  case cd: CurrentDate =>
+
CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestampMillis,
--- End diff --

Are you sure this is right? From what I see in tests about how CurrentDate 
is expected to be converted to literal, the millis should be exactly rounded to 
the day boundary. 


https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L47



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89235928
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
 ---
@@ -26,8 +26,8 @@ class StreamProgress(
 val baseMap: immutable.Map[Source, Offset] = new 
immutable.HashMap[Source, Offset])
   extends scala.collection.immutable.Map[Source, Offset] {
 
-  def toOffsetSeq(source: Seq[Source]): OffsetSeq = {
-OffsetSeq(source.map(get))
+  def toOffsetSeq(source: Seq[Source], metadata: Option[String] = None): 
OffsetSeq = {
--- End diff --

any need to make the metadata optional? Seems like we would want to always 
pass on metadata when creating toOffsetSeq, and there is only couple of usages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89237316
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -38,6 +40,27 @@ import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
 /**
+ * Contains metadata associated with a stream execution. This information 
is
+ * persisted to the offset log via the OffsetSeq metadata field. Current
+ * information contained in this object includes:
+ *
+ * @param currentEventTimeWatermarkMillis: The current eventTime 
watermark, used to
+ * bound the lateness of data that will processed. Time unit: milliseconds
+ * @param currentBatchTimestampMillis: The current batch processing 
timestamp.
+ * Time unit: milliseconds
+ */
+case class StreamExecutionMetadata(
+var currentEventTimeWatermarkMillis: Long = 0,
--- End diff --

nit: this is pretty long! how about just `batchWatermarkMs`, and 
`batchTimestampMs`. Its not really current if a batch is being reprocessed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89235503
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -734,6 +770,13 @@ class StreamExecution(
   case object TERMINATED extends State
 }
 
+object StreamExecutionMetadata {
--- End diff --

Please put the StreamExecutionMetadata class and object near to each other. 
i think we generally follow this conventions. So i would move the class to 
here, after the StreamExecution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89233977
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -92,8 +116,8 @@ class StreamExecution(
   /** The current batchId or -1 if execution has not yet been initialized. 
*/
   private var currentBatchId: Long = -1
 
-  /** The current eventTime watermark, used to bound the lateness of data 
that will processed. */
-  private var currentEventTimeWatermark: Long = 0
+  /** stream execution metadata */
--- End diff --

nit: s -> S


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89234438
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -288,7 +312,9 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-logDebug(s"Found possibly uncommitted offsets $availableOffsets")
+streamExecutionMetadata = 
StreamExecutionMetadata(nextOffsets.metadata.getOrElse("{}"))
+logDebug(s"Found possibly uncommitted offsets $availableOffsets " +
--- End diff --

nit: uncommitted -> unprocessed. uncommitted is confusing to me, is it not 
committed to the log or the sink?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89238744
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -235,4 +236,49 @@ class StreamingAggregationSuite extends StreamTest 
with BeforeAndAfterAll {
   CheckLastBatch(("a", 30), ("b", 3), ("c", 1))
 )
   }
+
+  test("prune results by time, complete mode") {
+import testImplicits._
+import StreamingAggregationSuite._
+clock = new StreamManualClock
+
+val inputData = MemoryStream[Long]
+
+val aggregated =
+  inputData.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.as[(Long, Long)]
+.where('value >= current_timestamp().cast("long") - 10L)
+
+testStream(aggregated, Complete)(
+  StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+
+  // advance clock to 10 seconds
+  AddData(inputData, 0L, 5L, 5L, 10L),
+  AdvanceManualClock(10 * 1000),
+  AssertOnQuery { _ => clock.getTimeMillis() === 10 * 1000 },
+  CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+
+  // advance clock to 20 seconds
--- End diff --

nice to add `// advance clock to 20 seconds, should retain only keys >= 20`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89237532
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -235,4 +236,49 @@ class StreamingAggregationSuite extends StreamTest 
with BeforeAndAfterAll {
   CheckLastBatch(("a", 30), ("b", 3), ("c", 1))
 )
   }
+
+  test("prune results by time, complete mode") {
+import testImplicits._
+import StreamingAggregationSuite._
+clock = new StreamManualClock
+
+val inputData = MemoryStream[Long]
+
+val aggregated =
+  inputData.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.as[(Long, Long)]
+.where('value >= current_timestamp().cast("long") - 10L)
+
+testStream(aggregated, Complete)(
+  StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+
+  // advance clock to 10 seconds
+  AddData(inputData, 0L, 5L, 5L, 10L),
+  AdvanceManualClock(10 * 1000),
+  AssertOnQuery { _ => clock.getTimeMillis() === 10 * 1000 },
--- End diff --

nit: unnecessary, AdvanceManualClock already tests this internally


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89237754
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -235,4 +236,49 @@ class StreamingAggregationSuite extends StreamTest 
with BeforeAndAfterAll {
   CheckLastBatch(("a", 30), ("b", 3), ("c", 1))
 )
   }
+
+  test("prune results by time, complete mode") {
+import testImplicits._
+import StreamingAggregationSuite._
+clock = new StreamManualClock
+
+val inputData = MemoryStream[Long]
+
+val aggregated =
+  inputData.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.as[(Long, Long)]
+.where('value >= current_timestamp().cast("long") - 10L)
+
+testStream(aggregated, Complete)(
+  StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+
+  // advance clock to 10 seconds
+  AddData(inputData, 0L, 5L, 5L, 10L),
+  AdvanceManualClock(10 * 1000),
+  AssertOnQuery { _ => clock.getTimeMillis() === 10 * 1000 },
+  CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+
+  // advance clock to 20 seconds
+  AddData(inputData, 15L, 15L, 20L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
+
+  // advance clock to 30 seconds
+  AddData(inputData, 0L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((20L, 1)),
+
+  // advance clock to 40 seconds
+  AddData(inputData, 25L, 30L, 40L, 45L),
+  AdvanceManualClock(10 * 1000),
+  CheckLastBatch((30L, 1), (40L, 1), (45L, 1))
--- End diff --

I dont see how this tests whether the watermark, and timestamp is recovered 
correctly from log. nothing is being stopped.

I think there should be another test that actually stops the stream, drops 
the complete mode table and restarts it from checkpoint, and see if the table 
gets recreated to the same value. 

something like 
```
val query = inputData.toDF() 
queryName("aggregates").option("checkpointLocation", cpPath).start()
memoryStream.add()
query.processAllAvailable()
checkAnswer(table("aggregates"), expectedResults)
query.stop()

spark.drop("aggregates")
clock.add(100)

val query2 = inputData.toDF() 
queryName("aggregates").option("checkpointLocation", cpPath).start()
query.processAllAvailable()
checkAnswer(table("aggregates"), expectedResults)  // verify that 
increasing the clock while query was down does not change result of the last 
batch

```
See this for more inspiration - 
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala#L472
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org