[jira] [Commented] (SPARK-33039) Misleading watermark calculation in structure streaming

2020-10-09 Thread Sean R. Owen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211294#comment-17211294
 ] 

Sean R. Owen commented on SPARK-33039:
--

Yeah I think that's an OK resolution. Thanks for resolving Sandish.

> Misleading watermark calculation in structure streaming
> ---
>
> Key: SPARK-33039
> URL: https://issues.apache.org/jira/browse/SPARK-33039
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Sandish Kumar HN
>Priority: Major
>
> source code:
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import java.sql.Timestamp
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
> object TestWaterMark extends App {
>  val spark = SparkSession.builder().master("local").getOrCreate()
>  val sc = spark.sparkContext
>  val dir = new Path("/tmp/test-structured-streaming")
>  val fs = dir.getFileSystem(sc.hadoopConfiguration)
>  fs.mkdirs(dir)
>  val schema = StructType(StructField("vilue", StringType) ::
>  StructField("timestamp", TimestampType) ::
>  Nil)
>  val eventStream = spark
>  .readStream
>  .option("sep", ";")
>  .option("header", "false")
>  .schema(schema)
>  .csv(dir.toString)
>  // Watermarked aggregation
>  val eventsCount = eventStream
>  .withWatermark("timestamp", "5 seconds")
>  .groupBy(window(col("timestamp"), "10 seconds"))
>  .count
>  def writeFile(path: Path, data: String) {
>  val file = fs.create(path)
>  file.writeUTF(data)
>  file.close()
>  }
>  // Debug query
>  val query = eventsCount.writeStream
>  .format("console")
>  .outputMode("complete")
>  .option("truncate", "false")
>  .trigger(Trigger.ProcessingTime("5 seconds"))
>  .start()
>  writeFile(new Path(dir, "file1"), """
>  |OLD;2019-08-09 10:05:00
>  |OLD;2019-08-09 10:10:00
>  |OLD;2019-08-09 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp1 = query.lastProgress
>  println(lp1.eventTime)
>  writeFile(new Path(dir, "file2"), """
>  |NEW;2020-08-29 10:05:00
>  |NEW;2020-08-29 10:10:00
>  |NEW;2020-08-29 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp2 = query.lastProgress
>  println(lp2.eventTime)
>  writeFile(new Path(dir, "file4"), """
>  |OLD;2017-08-10 10:05:00
>  |OLD;2017-08-10 10:10:00
>  |OLD;2017-08-10 10:15:00""".stripMargin)
>  writeFile(new Path(dir, "file3"), "")
>  query.processAllAvailable()
>  val lp3 = query.lastProgress
>  println(lp3.eventTime)
>  query.awaitTermination()
>  fs.delete(dir, true)
> }
> {code}
> OUTPUT:
>  
> {code:java}
> ---
> Batch: 0
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +--+-+
> {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, 
> watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z}
> ---
> Batch: 1
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +--+-+
> {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, 
> watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z}
> ---
> Batch: 2
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 |
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2017-08-10 10:10:00, 2017-08-10 10:10:10]|1 |
> |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +--+-+
> {min=2017-08-10T17:05:00.000Z, avg=2017-08-10T17:10:00.000Z, 
> watermark=2020-08-29T17:14:55.000Z, max=2017-08-10T17:15:00.000Z}
> {code}
> 

[jira] [Commented] (SPARK-33039) Misleading watermark calculation in structure streaming

2020-10-09 Thread Aoyuan Liao (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211288#comment-17211288
 ] 

Aoyuan Liao commented on SPARK-33039:
-

[~srowen] This is actually not a bug. The user didn't fully understand the 
documenation. The output is correct as what we intended. 
Can we mark it as "not a problem"?

> Misleading watermark calculation in structure streaming
> ---
>
> Key: SPARK-33039
> URL: https://issues.apache.org/jira/browse/SPARK-33039
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Sandish Kumar HN
>Priority: Major
>
> source code:
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import java.sql.Timestamp
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
> object TestWaterMark extends App {
>  val spark = SparkSession.builder().master("local").getOrCreate()
>  val sc = spark.sparkContext
>  val dir = new Path("/tmp/test-structured-streaming")
>  val fs = dir.getFileSystem(sc.hadoopConfiguration)
>  fs.mkdirs(dir)
>  val schema = StructType(StructField("vilue", StringType) ::
>  StructField("timestamp", TimestampType) ::
>  Nil)
>  val eventStream = spark
>  .readStream
>  .option("sep", ";")
>  .option("header", "false")
>  .schema(schema)
>  .csv(dir.toString)
>  // Watermarked aggregation
>  val eventsCount = eventStream
>  .withWatermark("timestamp", "5 seconds")
>  .groupBy(window(col("timestamp"), "10 seconds"))
>  .count
>  def writeFile(path: Path, data: String) {
>  val file = fs.create(path)
>  file.writeUTF(data)
>  file.close()
>  }
>  // Debug query
>  val query = eventsCount.writeStream
>  .format("console")
>  .outputMode("complete")
>  .option("truncate", "false")
>  .trigger(Trigger.ProcessingTime("5 seconds"))
>  .start()
>  writeFile(new Path(dir, "file1"), """
>  |OLD;2019-08-09 10:05:00
>  |OLD;2019-08-09 10:10:00
>  |OLD;2019-08-09 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp1 = query.lastProgress
>  println(lp1.eventTime)
>  writeFile(new Path(dir, "file2"), """
>  |NEW;2020-08-29 10:05:00
>  |NEW;2020-08-29 10:10:00
>  |NEW;2020-08-29 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp2 = query.lastProgress
>  println(lp2.eventTime)
>  writeFile(new Path(dir, "file4"), """
>  |OLD;2017-08-10 10:05:00
>  |OLD;2017-08-10 10:10:00
>  |OLD;2017-08-10 10:15:00""".stripMargin)
>  writeFile(new Path(dir, "file3"), "")
>  query.processAllAvailable()
>  val lp3 = query.lastProgress
>  println(lp3.eventTime)
>  query.awaitTermination()
>  fs.delete(dir, true)
> }
> {code}
> OUTPUT:
>  
> {code:java}
> ---
> Batch: 0
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +--+-+
> {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, 
> watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z}
> ---
> Batch: 1
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +--+-+
> {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, 
> watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z}
> ---
> Batch: 2
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 |
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2017-08-10 10:10:00, 2017-08-10 10:10:10]|1 |
> |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +--+-+
> {min=2017-08-10T17:05:00.000Z, 

[jira] [Commented] (SPARK-33039) Misleading watermark calculation in structure streaming

2020-10-06 Thread Aoyuan Liao (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17209030#comment-17209030
 ] 

Aoyuan Liao commented on SPARK-33039:
-

Since the window: [2020-08-29 10:15:00, 2020-08-29 10:15:10] ended after the 
watermark: 2020-08-29T17:14:55.000, the partial count maintained as internal 
state while waiting for later data, so not yet added to the result table. 
Please take a deeper look at the append mode example shown in the 
documentation: 
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking
 

> Misleading watermark calculation in structure streaming
> ---
>
> Key: SPARK-33039
> URL: https://issues.apache.org/jira/browse/SPARK-33039
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Sandish Kumar HN
>Priority: Major
>
> source code:
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import java.sql.Timestamp
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
> object TestWaterMark extends App {
>  val spark = SparkSession.builder().master("local").getOrCreate()
>  val sc = spark.sparkContext
>  val dir = new Path("/tmp/test-structured-streaming")
>  val fs = dir.getFileSystem(sc.hadoopConfiguration)
>  fs.mkdirs(dir)
>  val schema = StructType(StructField("vilue", StringType) ::
>  StructField("timestamp", TimestampType) ::
>  Nil)
>  val eventStream = spark
>  .readStream
>  .option("sep", ";")
>  .option("header", "false")
>  .schema(schema)
>  .csv(dir.toString)
>  // Watermarked aggregation
>  val eventsCount = eventStream
>  .withWatermark("timestamp", "5 seconds")
>  .groupBy(window(col("timestamp"), "10 seconds"))
>  .count
>  def writeFile(path: Path, data: String) {
>  val file = fs.create(path)
>  file.writeUTF(data)
>  file.close()
>  }
>  // Debug query
>  val query = eventsCount.writeStream
>  .format("console")
>  .outputMode("complete")
>  .option("truncate", "false")
>  .trigger(Trigger.ProcessingTime("5 seconds"))
>  .start()
>  writeFile(new Path(dir, "file1"), """
>  |OLD;2019-08-09 10:05:00
>  |OLD;2019-08-09 10:10:00
>  |OLD;2019-08-09 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp1 = query.lastProgress
>  println(lp1.eventTime)
>  writeFile(new Path(dir, "file2"), """
>  |NEW;2020-08-29 10:05:00
>  |NEW;2020-08-29 10:10:00
>  |NEW;2020-08-29 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp2 = query.lastProgress
>  println(lp2.eventTime)
>  writeFile(new Path(dir, "file4"), """
>  |OLD;2017-08-10 10:05:00
>  |OLD;2017-08-10 10:10:00
>  |OLD;2017-08-10 10:15:00""".stripMargin)
>  writeFile(new Path(dir, "file3"), "")
>  query.processAllAvailable()
>  val lp3 = query.lastProgress
>  println(lp3.eventTime)
>  query.awaitTermination()
>  fs.delete(dir, true)
> }
> {code}
> OUTPUT:
>  
> {code:java}
> ---
> Batch: 0
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +--+-+
> {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, 
> watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z}
> ---
> Batch: 1
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +--+-+
> {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, 
> watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z}
> ---
> Batch: 2
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 |
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2017-08-10 10:10:00, 2017-08-10 

[jira] [Commented] (SPARK-33039) Misleading watermark calculation in structure streaming

2020-10-06 Thread Sandish Kumar HN (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17208674#comment-17208674
 ] 

Sandish Kumar HN commented on SPARK-33039:
--

[~EveLiao] I had made the change as output mode to append.

 I do see the last old records been getting dropped as expected.
OLD;2017-08-10 10:05:00
OLD;2017-08-10 10:10:00
OLD;2017-08-10 10:15:00"

but I also see new records getting dropped. may I know why?


NEW;2020-08-29 10:05:00
NEW;2020-08-29 10:10:00
NEW;2020-08-29 10:15:00

this record is getting dropped: 2020-08-29 10:15:00. please look at the output 
for the append mode. 

 
{code:java}
---
Batch: 0
---
+--+-+
|window|count|
+--+-+
+--+-+

---
Batch: 1
---
+--+-+
|window |count|
+--+-+
|[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
|[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
+--+-+

{watermark=2019-08-09T17:14:55.000Z}
---
Batch: 2
---
+--+-+
|window|count|
+--+-+
+--+-+

---
Batch: 3
---
+--+-+
|window |count|
+--+-+
|[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
|[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
|[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
+--+-+

{watermark=2020-08-29T17:14:55.000Z}
---
Batch: 4
---
+--+-+
|window|count|
+--+-+
+--+-+

{min=2017-08-10T17:05:00.000Z, avg=2017-08-10T17:10:00.000Z, 
watermark=2020-08-29T17:14:55.000Z, max=2017-08-10T17:15:00.000Z}
{code}
 

> Misleading watermark calculation in structure streaming
> ---
>
> Key: SPARK-33039
> URL: https://issues.apache.org/jira/browse/SPARK-33039
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Sandish Kumar HN
>Priority: Major
>
> source code:
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import java.sql.Timestamp
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
> object TestWaterMark extends App {
>  val spark = SparkSession.builder().master("local").getOrCreate()
>  val sc = spark.sparkContext
>  val dir = new Path("/tmp/test-structured-streaming")
>  val fs = dir.getFileSystem(sc.hadoopConfiguration)
>  fs.mkdirs(dir)
>  val schema = StructType(StructField("vilue", StringType) ::
>  StructField("timestamp", TimestampType) ::
>  Nil)
>  val eventStream = spark
>  .readStream
>  .option("sep", ";")
>  .option("header", "false")
>  .schema(schema)
>  .csv(dir.toString)
>  // Watermarked aggregation
>  val eventsCount = eventStream
>  .withWatermark("timestamp", "5 seconds")
>  .groupBy(window(col("timestamp"), "10 seconds"))
>  .count
>  def writeFile(path: Path, data: String) {
>  val file = fs.create(path)
>  file.writeUTF(data)
>  file.close()
>  }
>  // Debug query
>  val query = eventsCount.writeStream
>  .format("console")
>  .outputMode("complete")
>  .option("truncate", "false")
>  .trigger(Trigger.ProcessingTime("5 seconds"))
>  .start()
>  writeFile(new Path(dir, "file1"), """
>  |OLD;2019-08-09 10:05:00
>  |OLD;2019-08-09 10:10:00
>  |OLD;2019-08-09 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp1 = query.lastProgress
>  println(lp1.eventTime)
>  writeFile(new Path(dir, "file2"), """
>  |NEW;2020-08-29 10:05:00
>  |NEW;2020-08-29 10:10:00
>  |NEW;2020-08-29 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp2 = query.lastProgress
>  println(lp2.eventTime)
>  writeFile(new Path(dir, "file4"), """
>  |OLD;2017-08-10 10:05:00
>  |OLD;2017-08-10 10:10:00
>  |OLD;2017-08-10 10:15:00""".stripMargin)
>  writeFile(new Path(dir, "file3"), "")
>  query.processAllAvailable()
>  val lp3 = query.lastProgress
>  println(lp3.eventTime)
>  query.awaitTermination()
>  fs.delete(dir, true)
> }
> {code}
> OUTPUT:
>  
> {code:java}
> ---
> Batch: 0
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2019-08-09 10:15:00, 

[jira] [Commented] (SPARK-33039) Misleading watermark calculation in structure streaming

2020-10-05 Thread Aoyuan Liao (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17208409#comment-17208409
 ] 

Aoyuan Liao commented on SPARK-33039:
-

It is not a bug. It is documented that

"

It is important to note that the following conditions must be satisfied for the 
watermarking to clean the state in aggregation queries _(as of Spark 2.1.1, 
subject to change in the future)_.
 * *Output mode must be Append or Update*

" in 
[https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking.]

In your case, the data is output in complete mode so all aggregated data are 
preserved.

> Misleading watermark calculation in structure streaming
> ---
>
> Key: SPARK-33039
> URL: https://issues.apache.org/jira/browse/SPARK-33039
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Sandish Kumar HN
>Priority: Major
>
> source code:
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import java.sql.Timestamp
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
> object TestWaterMark extends App {
>  val spark = SparkSession.builder().master("local").getOrCreate()
>  val sc = spark.sparkContext
>  val dir = new Path("/tmp/test-structured-streaming")
>  val fs = dir.getFileSystem(sc.hadoopConfiguration)
>  fs.mkdirs(dir)
>  val schema = StructType(StructField("vilue", StringType) ::
>  StructField("timestamp", TimestampType) ::
>  Nil)
>  val eventStream = spark
>  .readStream
>  .option("sep", ";")
>  .option("header", "false")
>  .schema(schema)
>  .csv(dir.toString)
>  // Watermarked aggregation
>  val eventsCount = eventStream
>  .withWatermark("timestamp", "5 seconds")
>  .groupBy(window(col("timestamp"), "10 seconds"))
>  .count
>  def writeFile(path: Path, data: String) {
>  val file = fs.create(path)
>  file.writeUTF(data)
>  file.close()
>  }
>  // Debug query
>  val query = eventsCount.writeStream
>  .format("console")
>  .outputMode("complete")
>  .option("truncate", "false")
>  .trigger(Trigger.ProcessingTime("5 seconds"))
>  .start()
>  writeFile(new Path(dir, "file1"), """
>  |OLD;2019-08-09 10:05:00
>  |OLD;2019-08-09 10:10:00
>  |OLD;2019-08-09 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp1 = query.lastProgress
>  println(lp1.eventTime)
>  writeFile(new Path(dir, "file2"), """
>  |NEW;2020-08-29 10:05:00
>  |NEW;2020-08-29 10:10:00
>  |NEW;2020-08-29 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp2 = query.lastProgress
>  println(lp2.eventTime)
>  writeFile(new Path(dir, "file4"), """
>  |OLD;2017-08-10 10:05:00
>  |OLD;2017-08-10 10:10:00
>  |OLD;2017-08-10 10:15:00""".stripMargin)
>  writeFile(new Path(dir, "file3"), "")
>  query.processAllAvailable()
>  val lp3 = query.lastProgress
>  println(lp3.eventTime)
>  query.awaitTermination()
>  fs.delete(dir, true)
> }
> {code}
> OUTPUT:
>  
> {code:java}
> ---
> Batch: 0
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +--+-+
> {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, 
> watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z}
> ---
> Batch: 1
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +--+-+
> {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, 
> watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z}
> ---
> Batch: 2
> ---
> +--+-+
> |window |count|
> +--+-+
> |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 |
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00,