[jira] [Commented] (SPARK-21944) Watermark on window column is wrong

2017-09-08 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158948#comment-16158948
 ] 

Kevin Zhang commented on SPARK-21944:
-

[~mgaido] Do you mean the following way by saying "define the watermark on the 
column 'time' "?

{code:java}
val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
  .withWatermark("time", "10 seconds")
  .dropDuplicates("id", "window")
  .groupBy("window")
  .count
{code}
I don't know whether this is right, because the documentation indicates we 
should use the same column as is used in watermark, that is "time" column(which 
is not what I want). I tried this way and the application dosen't throw any 
exception, but it didn't drop events older than the watermark as expected. In 
the following example, after the batch containing an event with 
time=1504774540(2017/9/7 16:55:40 CST) is processed(the watermark should be 
adjust to 2017/9/7 16:55:30 CST), then I send an event with 
time=1504745724(2017/9/7 8:55:24 CST), this event is processed instead of being 
dropped as expected.

{code:java}
+-+-+   
|window   |count|
+-+-+
|[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
|[2017-09-07 08:55:20.0,2017-09-07 08:55:25.0]|1|
|[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
+-+-+

{min=2017-09-07T00:55:24.000Z, avg=2017-09-07T00:55:24.000Z, 
watermark=2017-09-07T08:55:30.000Z, max=2017-09-07T00:55:24.000Z}
{code}

Here is one thing important I have to say, that is my time zone is CST, instead 
of UTC. The start and end time in window is right, but the watermark is 
reported in UTC. I don't know whether this influences.

If I didn't make everything clear, please point it and I will explain. Thanks




> Watermark on window column is wrong
> ---
>
> Key: SPARK-21944
> URL: https://issues.apache.org/jira/browse/SPARK-21944
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Kevin Zhang
>
> When I use a watermark with dropDuplicates in the following way, the 
> watermark is calculated wrong
> {code:java}
> val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
>   .withWatermark("window", "10 seconds")
>   .dropDuplicates("id", "window")
>   .groupBy("window")
>   .count
> {code}
> where events is a dataframe with a timestamp column "time" and long column 
> "id".
> I registered a listener to print the event time stats in each batch, and the 
> results is like the following
> {code:shell}
> ---
> Batch: 0
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> ---
> Batch: 1
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> ---
> Batch: 2
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> {code}
> As can be seen, the event time stats are wrong which are always in 
> 1970-01-01, so the watermark 

[jira] [Commented] (SPARK-21944) Watermark on window column is wrong

2017-09-08 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158406#comment-16158406
 ] 

Marco Gaido commented on SPARK-21944:
-

[~kevinzhang] you should define the watermark on the column `"time"`, not the 
column `"window"`

> Watermark on window column is wrong
> ---
>
> Key: SPARK-21944
> URL: https://issues.apache.org/jira/browse/SPARK-21944
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Kevin Zhang
>
> When I use a watermark with dropDuplicates in the following way, the 
> watermark is calculated wrong
> {code:java}
> val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
>   .withWatermark("window", "10 seconds")
>   .dropDuplicates("id", "window")
>   .groupBy("window")
>   .count
> {code}
> where events is a dataframe with a timestamp column "time" and long column 
> "id".
> I registered a listener to print the event time stats in each batch, and the 
> results is like the following
> {code:shell}
> ---
> Batch: 0
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> ---
> Batch: 1
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> ---
> Batch: 2
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> {code}
> As can be seen, the event time stats are wrong which are always in 
> 1970-01-01, so the watermark is calculated wrong.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21944) Watermark on window column is wrong

2017-09-07 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158055#comment-16158055
 ] 

Kevin Zhang commented on SPARK-21944:
-

I found the problem when I use kafka source consuming our internal topics, but 
it is hard to verify what the problem is. So then I use socket source and 
produce some  data myself, like (1504774520 1), (1504774521 2), (1504774540 1),
(1504774520 4)... where the first element is timestamp, and the second element 
is id. so it's easy for you to produce sample data and reproduce the problem. 
Also the following is one group of data I used:

{code:shell}
1504774520 1
1504774521 2
1504774520 3
1504774540 1 
1504774520 4
1504774531 1
1504774532 1
1504774533 1
1504774520 1
1504774520 10
1504774526 11
{code}
 

> Watermark on window column is wrong
> ---
>
> Key: SPARK-21944
> URL: https://issues.apache.org/jira/browse/SPARK-21944
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Kevin Zhang
>
> When I use a watermark with dropDuplicates in the following way, the 
> watermark is calculated wrong
> {code:java}
> val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
>   .withWatermark("window", "10 seconds")
>   .dropDuplicates("id", "window")
>   .groupBy("window")
>   .count
> {code}
> where events is a dataframe with a timestamp column "time" and long column 
> "id".
> I registered a listener to print the event time stats in each batch, and the 
> results is like the following
> {code:shell}
> ---
> Batch: 0
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> ---
> Batch: 1
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> ---
> Batch: 2
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> {code}
> As can be seen, the event time stats are wrong which are always in 
> 1970-01-01, so the watermark is calculated wrong.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21944) Watermark on window column is wrong

2017-09-07 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157091#comment-16157091
 ] 

Marco Gaido commented on SPARK-21944:
-

May you please provide some sample data to reproduce the issue? Thanks.

> Watermark on window column is wrong
> ---
>
> Key: SPARK-21944
> URL: https://issues.apache.org/jira/browse/SPARK-21944
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Kevin Zhang
>
> When I use a watermark with dropDuplicates in the following way, the 
> watermark is calculated wrong
> {code:java}
> val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
>   .withWatermark("window", "10 seconds")
>   .dropDuplicates("id", "window")
>   .groupBy("window")
>   .count
> {code}
> where events is a dataframe with a timestamp column "time" and long column 
> "id".
> I registered a listener to print the event time stats in each batch, and the 
> results is like the following
> {code:shell}
> ---
> Batch: 0
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> ---
> Batch: 1
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> ---
> Batch: 2
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> {code}
> As can be seen, the event time stats are wrong which are always in 
> 1970-01-01, so the watermark is calculated wrong.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org