[ https://issues.apache.org/jira/browse/SPARK-24382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523227#comment-16523227 ]
Richard Yu commented on SPARK-24382: ------------------------------------ [~karthikus] Yes, the old dates would in fact matter, primarily because of the watermark in which you set to ten seconds. If you would look in Spark Structured Streaming Programming guide, it states that if the time during which the data was received was not within the specified interval of time (by the user, in your case, 10 seconds) which begins when the data is generated, the data would not be processed. Looking at the presentation of your data, I am assuming that the data was generated on May 1st. Meanwhile, you are processing the data on May 5th. In other words, you are feeding the data to spark far *later* than allowed. (We are talking days here). So as expected, nothing was outputted. > Spark Structured Streaming aggregation on old timestamp data > ------------------------------------------------------------ > > Key: SPARK-24382 > URL: https://issues.apache.org/jira/browse/SPARK-24382 > Project: Spark > Issue Type: Question > Components: Structured Streaming > Affects Versions: 2.2.0 > Reporter: Karthik > Priority: Major > Labels: beginner > > I am trying to aggregate the count of records every 10 seconds using the > structured streaming for the following incoming kafka data > {code:java} > { > "ts2" : "2018/05/01 00:02:50.041", > "serviceGroupId" : "123", > "userId" : "avv-0", > "stream" : "", > "lastUserActivity" : "00:02:50", > "lastUserActivityCount" : "0" > } > { > "ts2" : "2018/05/01 00:09:02.079", > "serviceGroupId" : "123", > "userId" : "avv-0", > "stream" : "", > "lastUserActivity" : "00:09:02", > "lastUserActivityCount" : "0" > } > { > "ts2" : "2018/05/01 00:09:02.086", > "serviceGroupId" : "123", > "userId" : "avv-2", > "stream" : "", > "lastUserActivity" : "00:09:02", > "lastUserActivityCount" : "0" > } > {code} > With the following logic > {code:java} > val sdvTuneInsAgg1 = df > .withWatermark("ts2", "10 seconds") > .groupBy(window(col("ts2"),"10 seconds")) > .agg(count("*") as "count") > .as[CountMetric1] > val query1 = sdvTuneInsAgg1.writeStream > .format("console") > .foreach(writer) > .start() > {code} > and I do not see any records inside the writer. But, the only anomaly is that > the current date is 2018/05/24 but the record that I am processing (ts2) has > old dates. Will aggregation / count work in this scenario ? -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org