[jira] [Commented] (SPARK-33039) Misleading watermark calculation in structure streaming
[ https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211294#comment-17211294 ] Sean R. Owen commented on SPARK-33039: -- Yeah I think that's an OK resolution. Thanks for resolving Sandish. > Misleading watermark calculation in structure streaming > --- > > Key: SPARK-33039 > URL: https://issues.apache.org/jira/browse/SPARK-33039 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.4 >Reporter: Sandish Kumar HN >Priority: Major > > source code: > {code:java} > import org.apache.spark.sql.SparkSession > import org.apache.hadoop.fs.Path > import java.sql.Timestamp > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} > object TestWaterMark extends App { > val spark = SparkSession.builder().master("local").getOrCreate() > val sc = spark.sparkContext > val dir = new Path("/tmp/test-structured-streaming") > val fs = dir.getFileSystem(sc.hadoopConfiguration) > fs.mkdirs(dir) > val schema = StructType(StructField("vilue", StringType) :: > StructField("timestamp", TimestampType) :: > Nil) > val eventStream = spark > .readStream > .option("sep", ";") > .option("header", "false") > .schema(schema) > .csv(dir.toString) > // Watermarked aggregation > val eventsCount = eventStream > .withWatermark("timestamp", "5 seconds") > .groupBy(window(col("timestamp"), "10 seconds")) > .count > def writeFile(path: Path, data: String) { > val file = fs.create(path) > file.writeUTF(data) > file.close() > } > // Debug query > val query = eventsCount.writeStream > .format("console") > .outputMode("complete") > .option("truncate", "false") > .trigger(Trigger.ProcessingTime("5 seconds")) > .start() > writeFile(new Path(dir, "file1"), """ > |OLD;2019-08-09 10:05:00 > |OLD;2019-08-09 10:10:00 > |OLD;2019-08-09 10:15:00""".stripMargin) > query.processAllAvailable() > val lp1 = query.lastProgress > println(lp1.eventTime) > writeFile(new Path(dir, "file2"), """ > |NEW;2020-08-29 10:05:00 > |NEW;2020-08-29 10:10:00 > |NEW;2020-08-29 10:15:00""".stripMargin) > query.processAllAvailable() > val lp2 = query.lastProgress > println(lp2.eventTime) > writeFile(new Path(dir, "file4"), """ > |OLD;2017-08-10 10:05:00 > |OLD;2017-08-10 10:10:00 > |OLD;2017-08-10 10:15:00""".stripMargin) > writeFile(new Path(dir, "file3"), "") > query.processAllAvailable() > val lp3 = query.lastProgress > println(lp3.eventTime) > query.awaitTermination() > fs.delete(dir, true) > } > {code} > OUTPUT: > > {code:java} > --- > Batch: 0 > --- > +--+-+ > |window |count| > +--+-+ > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +--+-+ > {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, > watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z} > --- > Batch: 1 > --- > +--+-+ > |window |count| > +--+-+ > |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | > |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +--+-+ > {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, > watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z} > --- > Batch: 2 > --- > +--+-+ > |window |count| > +--+-+ > |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 | > |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | > |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 | > |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2017-08-10 10:10:00, 2017-08-10 10:10:10]|1 | > |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +--+-+ > {min=2017-08-10T17:05:00.000Z, avg=2017-08-10T17:10:00.000Z, > watermark=2020-08-29T17:14:55.000Z, max=2017-08-10T17:15:00.000Z} > {code} >
[jira] [Commented] (SPARK-33039) Misleading watermark calculation in structure streaming
[ https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211288#comment-17211288 ] Aoyuan Liao commented on SPARK-33039: - [~srowen] This is actually not a bug. The user didn't fully understand the documenation. The output is correct as what we intended. Can we mark it as "not a problem"? > Misleading watermark calculation in structure streaming > --- > > Key: SPARK-33039 > URL: https://issues.apache.org/jira/browse/SPARK-33039 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.4 >Reporter: Sandish Kumar HN >Priority: Major > > source code: > {code:java} > import org.apache.spark.sql.SparkSession > import org.apache.hadoop.fs.Path > import java.sql.Timestamp > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} > object TestWaterMark extends App { > val spark = SparkSession.builder().master("local").getOrCreate() > val sc = spark.sparkContext > val dir = new Path("/tmp/test-structured-streaming") > val fs = dir.getFileSystem(sc.hadoopConfiguration) > fs.mkdirs(dir) > val schema = StructType(StructField("vilue", StringType) :: > StructField("timestamp", TimestampType) :: > Nil) > val eventStream = spark > .readStream > .option("sep", ";") > .option("header", "false") > .schema(schema) > .csv(dir.toString) > // Watermarked aggregation > val eventsCount = eventStream > .withWatermark("timestamp", "5 seconds") > .groupBy(window(col("timestamp"), "10 seconds")) > .count > def writeFile(path: Path, data: String) { > val file = fs.create(path) > file.writeUTF(data) > file.close() > } > // Debug query > val query = eventsCount.writeStream > .format("console") > .outputMode("complete") > .option("truncate", "false") > .trigger(Trigger.ProcessingTime("5 seconds")) > .start() > writeFile(new Path(dir, "file1"), """ > |OLD;2019-08-09 10:05:00 > |OLD;2019-08-09 10:10:00 > |OLD;2019-08-09 10:15:00""".stripMargin) > query.processAllAvailable() > val lp1 = query.lastProgress > println(lp1.eventTime) > writeFile(new Path(dir, "file2"), """ > |NEW;2020-08-29 10:05:00 > |NEW;2020-08-29 10:10:00 > |NEW;2020-08-29 10:15:00""".stripMargin) > query.processAllAvailable() > val lp2 = query.lastProgress > println(lp2.eventTime) > writeFile(new Path(dir, "file4"), """ > |OLD;2017-08-10 10:05:00 > |OLD;2017-08-10 10:10:00 > |OLD;2017-08-10 10:15:00""".stripMargin) > writeFile(new Path(dir, "file3"), "") > query.processAllAvailable() > val lp3 = query.lastProgress > println(lp3.eventTime) > query.awaitTermination() > fs.delete(dir, true) > } > {code} > OUTPUT: > > {code:java} > --- > Batch: 0 > --- > +--+-+ > |window |count| > +--+-+ > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +--+-+ > {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, > watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z} > --- > Batch: 1 > --- > +--+-+ > |window |count| > +--+-+ > |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | > |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +--+-+ > {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, > watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z} > --- > Batch: 2 > --- > +--+-+ > |window |count| > +--+-+ > |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 | > |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | > |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 | > |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2017-08-10 10:10:00, 2017-08-10 10:10:10]|1 | > |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +--+-+ > {min=2017-08-10T17:05:00.000Z,
[jira] [Commented] (SPARK-33039) Misleading watermark calculation in structure streaming
[ https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17209030#comment-17209030 ] Aoyuan Liao commented on SPARK-33039: - Since the window: [2020-08-29 10:15:00, 2020-08-29 10:15:10] ended after the watermark: 2020-08-29T17:14:55.000, the partial count maintained as internal state while waiting for later data, so not yet added to the result table. Please take a deeper look at the append mode example shown in the documentation: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking > Misleading watermark calculation in structure streaming > --- > > Key: SPARK-33039 > URL: https://issues.apache.org/jira/browse/SPARK-33039 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.4 >Reporter: Sandish Kumar HN >Priority: Major > > source code: > {code:java} > import org.apache.spark.sql.SparkSession > import org.apache.hadoop.fs.Path > import java.sql.Timestamp > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} > object TestWaterMark extends App { > val spark = SparkSession.builder().master("local").getOrCreate() > val sc = spark.sparkContext > val dir = new Path("/tmp/test-structured-streaming") > val fs = dir.getFileSystem(sc.hadoopConfiguration) > fs.mkdirs(dir) > val schema = StructType(StructField("vilue", StringType) :: > StructField("timestamp", TimestampType) :: > Nil) > val eventStream = spark > .readStream > .option("sep", ";") > .option("header", "false") > .schema(schema) > .csv(dir.toString) > // Watermarked aggregation > val eventsCount = eventStream > .withWatermark("timestamp", "5 seconds") > .groupBy(window(col("timestamp"), "10 seconds")) > .count > def writeFile(path: Path, data: String) { > val file = fs.create(path) > file.writeUTF(data) > file.close() > } > // Debug query > val query = eventsCount.writeStream > .format("console") > .outputMode("complete") > .option("truncate", "false") > .trigger(Trigger.ProcessingTime("5 seconds")) > .start() > writeFile(new Path(dir, "file1"), """ > |OLD;2019-08-09 10:05:00 > |OLD;2019-08-09 10:10:00 > |OLD;2019-08-09 10:15:00""".stripMargin) > query.processAllAvailable() > val lp1 = query.lastProgress > println(lp1.eventTime) > writeFile(new Path(dir, "file2"), """ > |NEW;2020-08-29 10:05:00 > |NEW;2020-08-29 10:10:00 > |NEW;2020-08-29 10:15:00""".stripMargin) > query.processAllAvailable() > val lp2 = query.lastProgress > println(lp2.eventTime) > writeFile(new Path(dir, "file4"), """ > |OLD;2017-08-10 10:05:00 > |OLD;2017-08-10 10:10:00 > |OLD;2017-08-10 10:15:00""".stripMargin) > writeFile(new Path(dir, "file3"), "") > query.processAllAvailable() > val lp3 = query.lastProgress > println(lp3.eventTime) > query.awaitTermination() > fs.delete(dir, true) > } > {code} > OUTPUT: > > {code:java} > --- > Batch: 0 > --- > +--+-+ > |window |count| > +--+-+ > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +--+-+ > {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, > watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z} > --- > Batch: 1 > --- > +--+-+ > |window |count| > +--+-+ > |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | > |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +--+-+ > {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, > watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z} > --- > Batch: 2 > --- > +--+-+ > |window |count| > +--+-+ > |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 | > |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | > |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 | > |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2017-08-10 10:10:00, 2017-08-10
[jira] [Commented] (SPARK-33039) Misleading watermark calculation in structure streaming
[ https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17208674#comment-17208674 ] Sandish Kumar HN commented on SPARK-33039: -- [~EveLiao] I had made the change as output mode to append. I do see the last old records been getting dropped as expected. OLD;2017-08-10 10:05:00 OLD;2017-08-10 10:10:00 OLD;2017-08-10 10:15:00" but I also see new records getting dropped. may I know why? NEW;2020-08-29 10:05:00 NEW;2020-08-29 10:10:00 NEW;2020-08-29 10:15:00 this record is getting dropped: 2020-08-29 10:15:00. please look at the output for the append mode. {code:java} --- Batch: 0 --- +--+-+ |window|count| +--+-+ +--+-+ --- Batch: 1 --- +--+-+ |window |count| +--+-+ |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | +--+-+ {watermark=2019-08-09T17:14:55.000Z} --- Batch: 2 --- +--+-+ |window|count| +--+-+ +--+-+ --- Batch: 3 --- +--+-+ |window |count| +--+-+ |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 | |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | +--+-+ {watermark=2020-08-29T17:14:55.000Z} --- Batch: 4 --- +--+-+ |window|count| +--+-+ +--+-+ {min=2017-08-10T17:05:00.000Z, avg=2017-08-10T17:10:00.000Z, watermark=2020-08-29T17:14:55.000Z, max=2017-08-10T17:15:00.000Z} {code} > Misleading watermark calculation in structure streaming > --- > > Key: SPARK-33039 > URL: https://issues.apache.org/jira/browse/SPARK-33039 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.4 >Reporter: Sandish Kumar HN >Priority: Major > > source code: > {code:java} > import org.apache.spark.sql.SparkSession > import org.apache.hadoop.fs.Path > import java.sql.Timestamp > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} > object TestWaterMark extends App { > val spark = SparkSession.builder().master("local").getOrCreate() > val sc = spark.sparkContext > val dir = new Path("/tmp/test-structured-streaming") > val fs = dir.getFileSystem(sc.hadoopConfiguration) > fs.mkdirs(dir) > val schema = StructType(StructField("vilue", StringType) :: > StructField("timestamp", TimestampType) :: > Nil) > val eventStream = spark > .readStream > .option("sep", ";") > .option("header", "false") > .schema(schema) > .csv(dir.toString) > // Watermarked aggregation > val eventsCount = eventStream > .withWatermark("timestamp", "5 seconds") > .groupBy(window(col("timestamp"), "10 seconds")) > .count > def writeFile(path: Path, data: String) { > val file = fs.create(path) > file.writeUTF(data) > file.close() > } > // Debug query > val query = eventsCount.writeStream > .format("console") > .outputMode("complete") > .option("truncate", "false") > .trigger(Trigger.ProcessingTime("5 seconds")) > .start() > writeFile(new Path(dir, "file1"), """ > |OLD;2019-08-09 10:05:00 > |OLD;2019-08-09 10:10:00 > |OLD;2019-08-09 10:15:00""".stripMargin) > query.processAllAvailable() > val lp1 = query.lastProgress > println(lp1.eventTime) > writeFile(new Path(dir, "file2"), """ > |NEW;2020-08-29 10:05:00 > |NEW;2020-08-29 10:10:00 > |NEW;2020-08-29 10:15:00""".stripMargin) > query.processAllAvailable() > val lp2 = query.lastProgress > println(lp2.eventTime) > writeFile(new Path(dir, "file4"), """ > |OLD;2017-08-10 10:05:00 > |OLD;2017-08-10 10:10:00 > |OLD;2017-08-10 10:15:00""".stripMargin) > writeFile(new Path(dir, "file3"), "") > query.processAllAvailable() > val lp3 = query.lastProgress > println(lp3.eventTime) > query.awaitTermination() > fs.delete(dir, true) > } > {code} > OUTPUT: > > {code:java} > --- > Batch: 0 > --- > +--+-+ > |window |count| > +--+-+ > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2019-08-09 10:15:00,
[jira] [Commented] (SPARK-33039) Misleading watermark calculation in structure streaming
[ https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17208409#comment-17208409 ] Aoyuan Liao commented on SPARK-33039: - It is not a bug. It is documented that " It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries _(as of Spark 2.1.1, subject to change in the future)_. * *Output mode must be Append or Update* " in [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking.] In your case, the data is output in complete mode so all aggregated data are preserved. > Misleading watermark calculation in structure streaming > --- > > Key: SPARK-33039 > URL: https://issues.apache.org/jira/browse/SPARK-33039 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.4 >Reporter: Sandish Kumar HN >Priority: Major > > source code: > {code:java} > import org.apache.spark.sql.SparkSession > import org.apache.hadoop.fs.Path > import java.sql.Timestamp > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} > object TestWaterMark extends App { > val spark = SparkSession.builder().master("local").getOrCreate() > val sc = spark.sparkContext > val dir = new Path("/tmp/test-structured-streaming") > val fs = dir.getFileSystem(sc.hadoopConfiguration) > fs.mkdirs(dir) > val schema = StructType(StructField("vilue", StringType) :: > StructField("timestamp", TimestampType) :: > Nil) > val eventStream = spark > .readStream > .option("sep", ";") > .option("header", "false") > .schema(schema) > .csv(dir.toString) > // Watermarked aggregation > val eventsCount = eventStream > .withWatermark("timestamp", "5 seconds") > .groupBy(window(col("timestamp"), "10 seconds")) > .count > def writeFile(path: Path, data: String) { > val file = fs.create(path) > file.writeUTF(data) > file.close() > } > // Debug query > val query = eventsCount.writeStream > .format("console") > .outputMode("complete") > .option("truncate", "false") > .trigger(Trigger.ProcessingTime("5 seconds")) > .start() > writeFile(new Path(dir, "file1"), """ > |OLD;2019-08-09 10:05:00 > |OLD;2019-08-09 10:10:00 > |OLD;2019-08-09 10:15:00""".stripMargin) > query.processAllAvailable() > val lp1 = query.lastProgress > println(lp1.eventTime) > writeFile(new Path(dir, "file2"), """ > |NEW;2020-08-29 10:05:00 > |NEW;2020-08-29 10:10:00 > |NEW;2020-08-29 10:15:00""".stripMargin) > query.processAllAvailable() > val lp2 = query.lastProgress > println(lp2.eventTime) > writeFile(new Path(dir, "file4"), """ > |OLD;2017-08-10 10:05:00 > |OLD;2017-08-10 10:10:00 > |OLD;2017-08-10 10:15:00""".stripMargin) > writeFile(new Path(dir, "file3"), "") > query.processAllAvailable() > val lp3 = query.lastProgress > println(lp3.eventTime) > query.awaitTermination() > fs.delete(dir, true) > } > {code} > OUTPUT: > > {code:java} > --- > Batch: 0 > --- > +--+-+ > |window |count| > +--+-+ > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +--+-+ > {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, > watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z} > --- > Batch: 1 > --- > +--+-+ > |window |count| > +--+-+ > |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | > |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +--+-+ > {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, > watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z} > --- > Batch: 2 > --- > +--+-+ > |window |count| > +--+-+ > |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 | > |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | > |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 | > |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | > |[2019-08-09 10:05:00,