Re: [Structured Streaming] Trying to use Spark structured streaming

2017-09-11 Thread Eduardo D'Avila
Burak, thanks for the resources.

I was thinking that the trigger interval and the sliding window were the
same thing, but now I am confused.
I didn't know there was a .trigger() method, since the official Programming
Guide
<https://spark.apache.org/docs/2.1.1/structured-streaming-programming-guide.html>
doesn't even mention it(!)

Calling .trigger(ProcessingTime("1 minute")) made the program generate
output at most once each minute, instead of 2-3 times each minute.

However, I'm still unable to understand what my program is outputting. For
instance, in a single triggering, this is part of the output in the same
partition/version (from ~1000 items):

...
(15:44:29, 15:44:30) 275
(15:44:30, 15:44:30) 259
(15:44:30, 15:44:30) 261
...


   - Why is there *multiple outputs for the same window*?
   - Why *almost all* windows have a zero length (begin and end timestamps
   are equal)?

Additionally, what is the use of sliding window?

Thanks,

Eduardo

2017-09-11 13:11 GMT-03:00 Burak Yavuz <brk...@gmail.com>:

> Hi Eduardo,
>
> What you have written out is to output counts "as fast as possible" for
> windows of 5 minute length and with a sliding window of 1 minute. So for a
> record at 10:13, you would get that record included in the count for
> 10:09-10:14, 10:10-10:15, 10:11-10:16, 10:12-10:16, 10:13-10:18.
>
> Please take a look at the following blog post for more details:
> https://databricks.com/blog/2017/05/08/event-time-
> aggregation-watermarking-apache-sparks-structured-streaming.html
> Also this talk can be helpful:
> https://www.youtube.com/watch?v=JAb4FIheP28=942s (especially after 19th
> minute)
>
> What you seem to be looking for is "Update" output mode (you may need
> Spark 2.2 for this IIRC), with a window duration of 5 minutes and no
> sliding interval, and a processing time trigger of 1 minute. Note that this
> still doesn't guarantee 1 output row every trigger as late data may arrive
> (unless you set the watermark accordingly).
>
>
> Best,
> Burak
>
>
> On Mon, Sep 11, 2017 at 8:04 AM, Eduardo D'Avila <
> eduardo.dav...@corp.globo.com> wrote:
>
>> Hi,
>>
>> I'm trying to use Spark 2.1.1 structured streaming to *count the number
>> of records* from Kafka *for each time window* with the code in this
>> GitHub gist
>> <https://gist.github.com/erdavila/b6ab0c216e82ae77fa8192c48cb816e4>.
>>
>> I expected that, *once each minute* (the slide duration), it would *output
>> a single record* (since the only aggregation key is the window) with the 
>> *record
>> count for the last 5 minutes* (the window duration). However, it outputs
>> several records 2-3 times per minute, like in the sample output included in
>> the gist.
>>
>> Changing the output mode to "append" seems to change the behavior, but
>> still far from what I expected.
>>
>> What is wrong with my assumptions on the way it should work? Given the
>> code, how should the sample output be interpreted or used?
>>
>> Thanks,
>>
>> Eduardo
>>
>
>


Re: [Structured Streaming] Trying to use Spark structured streaming

2017-09-11 Thread Burak Yavuz
Hi Eduardo,

What you have written out is to output counts "as fast as possible" for
windows of 5 minute length and with a sliding window of 1 minute. So for a
record at 10:13, you would get that record included in the count for
10:09-10:14, 10:10-10:15, 10:11-10:16, 10:12-10:16, 10:13-10:18.

Please take a look at the following blog post for more details:
https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html
Also this talk can be helpful:
https://www.youtube.com/watch?v=JAb4FIheP28=942s (especially after 19th
minute)

What you seem to be looking for is "Update" output mode (you may need Spark
2.2 for this IIRC), with a window duration of 5 minutes and no sliding
interval, and a processing time trigger of 1 minute. Note that this still
doesn't guarantee 1 output row every trigger as late data may arrive
(unless you set the watermark accordingly).


Best,
Burak


On Mon, Sep 11, 2017 at 8:04 AM, Eduardo D'Avila <
eduardo.dav...@corp.globo.com> wrote:

> Hi,
>
> I'm trying to use Spark 2.1.1 structured streaming to *count the number
> of records* from Kafka *for each time window* with the code in this
> GitHub gist
> <https://gist.github.com/erdavila/b6ab0c216e82ae77fa8192c48cb816e4>.
>
> I expected that, *once each minute* (the slide duration), it would *output
> a single record* (since the only aggregation key is the window) with the 
> *record
> count for the last 5 minutes* (the window duration). However, it outputs
> several records 2-3 times per minute, like in the sample output included in
> the gist.
>
> Changing the output mode to "append" seems to change the behavior, but
> still far from what I expected.
>
> What is wrong with my assumptions on the way it should work? Given the
> code, how should the sample output be interpreted or used?
>
> Thanks,
>
> Eduardo
>


[Structured Streaming] Trying to use Spark structured streaming

2017-09-11 Thread Eduardo D'Avila
Hi,

I'm trying to use Spark 2.1.1 structured streaming to *count the number of
records* from Kafka *for each time window* with the code in this GitHub gist
<https://gist.github.com/erdavila/b6ab0c216e82ae77fa8192c48cb816e4>.

I expected that, *once each minute* (the slide duration), it would *output
a single record* (since the only aggregation key is the window) with
the *record
count for the last 5 minutes* (the window duration). However, it outputs
several records 2-3 times per minute, like in the sample output included in
the gist.

Changing the output mode to "append" seems to change the behavior, but
still far from what I expected.

What is wrong with my assumptions on the way it should work? Given the
code, how should the sample output be interpreted or used?

Thanks,

Eduardo