Re: ParDo with Timer hangs when running under TestStream

2017-12-13 Thread Andrew Jones
No problem, thanks!


On Wed, 13 Dec 2017, at 14:30, Kenneth Knowles wrote:
> Hi Andrew,
> 
> As someone else pointed out to me, I didn't read your code carefully
> enough. Your timer is an event time timer so it should fire. I've
> filed https://issues.apache.org/jira/browse/BEAM-3341 for
> investigation.> 
> Kenn
> 
> On Wed, Dec 13, 2017 at 5:53 AM, Andrew Jones  jones.com> wrote:>> __
>> Thanks Kenn. I've tried adding calls to advanceProcessingTime[1], but
>> it doesn't seem to be helping. The test still hangs after processing
>> all the data.>> 
>> Is this because I'm using the global window? So the window itself
>> doesn't ever get closed?>> 
>> The point about the expiry timer is a good one, thanks.
>> 
>> Thanks,
>> Andrew
>> 
>> [1]: 
>> https://github.com/andrewrjones/beam-test-stream-timer/compare/advanceProcessingTime>>
>>  
>> On Tue, 12 Dec 2017, at 04:58, Kenneth Knowles wrote:
>>> Hi Andrew,
>>> 
>>> This is because TestStream also controls processing time. You'll
>>> want to call #advanceProcessingTime [1] to move the clock forward.
>>> This example brings up a good best practice: When you use the
>>> stateful DoFn, you often want to set an event time timer for window
>>> expiration time (that's the end of the window + allowed lateness) to
>>> make sure to flush anything left in state.>>> 
>>> Kenn
>>> 
>>> [1] 
>>> https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/testing/TestStream.Builder.html#advanceProcessingTime-org.joda.time.Duration->>>
>>>  
>>> On Mon, Dec 11, 2017 at 12:51 PM, Andrew Jones >> jones.com> wrote: Hi,
 
 I have a unit test using TestStream. It worked fine, until I
 added a Timer to the pipeline I'm testing, and now it hangs after 
 seemingly finishing correctly.
 
 I've put together a minimal example at
 https://github.com/andrewrjones/beam-test-stream-timer/blob/master/src/test/java/com/andrewjones/beam/TimerTest.java.
  I notice when I use the following, it hangs:
 
 .addElements(KV.of("hello", 100))
 .addElements(KV.of("hello", 200))
 
 However, this seems to be fine:
 
 .addElements(KV.of("hello", 100), KV.of("hello", 200))
 
 In both cases the code seems to work as expected, judging by the
 calls to println.
 
 Is this a problem with TestStream? Or should I not have KVs
 with the same Key when using a Timer?
 
 Thanks,
 Andrew
>> 



Re: ParDo with Timer hangs when running under TestStream

2017-12-13 Thread Kenneth Knowles
Hi Andrew,

As someone else pointed out to me, I didn't read your code carefully
enough. Your timer is an event time timer so it should fire. I've filed
https://issues.apache.org/jira/browse/BEAM-3341 for investigation.

Kenn

On Wed, Dec 13, 2017 at 5:53 AM, Andrew Jones 
wrote:

> Thanks Kenn. I've tried adding calls to advanceProcessingTime[1], but it
> doesn't seem to be helping. The test still hangs after processing all the
> data.
>
> Is this because I'm using the global window? So the window itself doesn't
> ever get closed?
>
> The point about the expiry timer is a good one, thanks.
>
> Thanks,
> Andrew
>
> [1]: https://github.com/andrewrjones/beam-test-stream-timer/compare/
> advanceProcessingTime
>
> On Tue, 12 Dec 2017, at 04:58, Kenneth Knowles wrote:
>
> Hi Andrew,
>
> This is because TestStream also controls processing time. You'll want to
> call #advanceProcessingTime [1] to move the clock forward. This example
> brings up a good best practice: When you use the stateful DoFn, you often
> want to set an event time timer for window expiration time (that's the end
> of the window + allowed lateness) to make sure to flush anything left in
> state.
>
> Kenn
>
> [1] https://beam.apache.org/documentation/sdks/javadoc/2.
> 2.0/org/apache/beam/sdk/testing/TestStream.Builder.
> html#advanceProcessingTime-org.joda.time.Duration-
>
> On Mon, Dec 11, 2017 at 12:51 PM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
> Hi,
>
> I have a unit test using TestStream. It worked fine, until I added a
> Timer to the pipeline I'm testing, and now it hangs after seemingly
> finishing correctly.
>
> I've put together a minimal example at
> https://github.com/andrewrjones/beam-test-stream-timer/blob/
> master/src/test/java/com/andrewjones/beam/TimerTest.java.
> I notice when I use the following, it hangs:
>
> .addElements(KV.of("hello", 100))
> .addElements(KV.of("hello", 200))
>
> However, this seems to be fine:
>
> .addElements(KV.of("hello", 100), KV.of("hello", 200))
>
> In both cases the code seems to work as expected, judging by the calls
> to println.
>
> Is this a problem with TestStream? Or should I not have KVs with the
> same Key when using a Timer?
>
> Thanks,
> Andrew
>
>
>


Re: ParDo with Timer hangs when running under TestStream

2017-12-11 Thread Kenneth Knowles
Hi Andrew,

This is because TestStream also controls processing time. You'll want to
call #advanceProcessingTime [1] to move the clock forward. This example
brings up a good best practice: When you use the stateful DoFn, you often
want to set an event time timer for window expiration time (that's the end
of the window + allowed lateness) to make sure to flush anything left in
state.

Kenn

[1]
https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/testing/TestStream.Builder.html#advanceProcessingTime-org.joda.time.Duration-

On Mon, Dec 11, 2017 at 12:51 PM, Andrew Jones  wrote:

> Hi,
>
> I have a unit test using TestStream. It worked fine, until I added a
> Timer to the pipeline I'm testing, and now it hangs after seemingly
> finishing correctly.
>
> I've put together a minimal example at
> https://github.com/andrewrjones/beam-test-stream-
> timer/blob/master/src/test/java/com/andrewjones/beam/TimerTest.java.
> I notice when I use the following, it hangs:
>
> .addElements(KV.of("hello", 100))
> .addElements(KV.of("hello", 200))
>
> However, this seems to be fine:
>
> .addElements(KV.of("hello", 100), KV.of("hello", 200))
>
> In both cases the code seems to work as expected, judging by the calls
> to println.
>
> Is this a problem with TestStream? Or should I not have KVs with the
> same Key when using a Timer?
>
> Thanks,
> Andrew
>