Re: ParDo with Timer hangs when running under TestStream
No problem, thanks! On Wed, 13 Dec 2017, at 14:30, Kenneth Knowles wrote: > Hi Andrew, > > As someone else pointed out to me, I didn't read your code carefully > enough. Your timer is an event time timer so it should fire. I've > filed https://issues.apache.org/jira/browse/BEAM-3341 for > investigation.> > Kenn > > On Wed, Dec 13, 2017 at 5:53 AM, Andrew Jonesjones.com> wrote:>> __ >> Thanks Kenn. I've tried adding calls to advanceProcessingTime[1], but >> it doesn't seem to be helping. The test still hangs after processing >> all the data.>> >> Is this because I'm using the global window? So the window itself >> doesn't ever get closed?>> >> The point about the expiry timer is a good one, thanks. >> >> Thanks, >> Andrew >> >> [1]: >> https://github.com/andrewrjones/beam-test-stream-timer/compare/advanceProcessingTime>> >> >> On Tue, 12 Dec 2017, at 04:58, Kenneth Knowles wrote: >>> Hi Andrew, >>> >>> This is because TestStream also controls processing time. You'll >>> want to call #advanceProcessingTime [1] to move the clock forward. >>> This example brings up a good best practice: When you use the >>> stateful DoFn, you often want to set an event time timer for window >>> expiration time (that's the end of the window + allowed lateness) to >>> make sure to flush anything left in state.>>> >>> Kenn >>> >>> [1] >>> https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/testing/TestStream.Builder.html#advanceProcessingTime-org.joda.time.Duration->>> >>> >>> On Mon, Dec 11, 2017 at 12:51 PM, Andrew Jones >> jones.com> wrote: Hi, I have a unit test using TestStream. It worked fine, until I added a Timer to the pipeline I'm testing, and now it hangs after seemingly finishing correctly. I've put together a minimal example at https://github.com/andrewrjones/beam-test-stream-timer/blob/master/src/test/java/com/andrewjones/beam/TimerTest.java. I notice when I use the following, it hangs: .addElements(KV.of("hello", 100)) .addElements(KV.of("hello", 200)) However, this seems to be fine: .addElements(KV.of("hello", 100), KV.of("hello", 200)) In both cases the code seems to work as expected, judging by the calls to println. Is this a problem with TestStream? Or should I not have KVs with the same Key when using a Timer? Thanks, Andrew >>
Re: ParDo with Timer hangs when running under TestStream
Hi Andrew, As someone else pointed out to me, I didn't read your code carefully enough. Your timer is an event time timer so it should fire. I've filed https://issues.apache.org/jira/browse/BEAM-3341 for investigation. Kenn On Wed, Dec 13, 2017 at 5:53 AM, Andrew Joneswrote: > Thanks Kenn. I've tried adding calls to advanceProcessingTime[1], but it > doesn't seem to be helping. The test still hangs after processing all the > data. > > Is this because I'm using the global window? So the window itself doesn't > ever get closed? > > The point about the expiry timer is a good one, thanks. > > Thanks, > Andrew > > [1]: https://github.com/andrewrjones/beam-test-stream-timer/compare/ > advanceProcessingTime > > On Tue, 12 Dec 2017, at 04:58, Kenneth Knowles wrote: > > Hi Andrew, > > This is because TestStream also controls processing time. You'll want to > call #advanceProcessingTime [1] to move the clock forward. This example > brings up a good best practice: When you use the stateful DoFn, you often > want to set an event time timer for window expiration time (that's the end > of the window + allowed lateness) to make sure to flush anything left in > state. > > Kenn > > [1] https://beam.apache.org/documentation/sdks/javadoc/2. > 2.0/org/apache/beam/sdk/testing/TestStream.Builder. > html#advanceProcessingTime-org.joda.time.Duration- > > On Mon, Dec 11, 2017 at 12:51 PM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > > Hi, > > I have a unit test using TestStream. It worked fine, until I added a > Timer to the pipeline I'm testing, and now it hangs after seemingly > finishing correctly. > > I've put together a minimal example at > https://github.com/andrewrjones/beam-test-stream-timer/blob/ > master/src/test/java/com/andrewjones/beam/TimerTest.java. > I notice when I use the following, it hangs: > > .addElements(KV.of("hello", 100)) > .addElements(KV.of("hello", 200)) > > However, this seems to be fine: > > .addElements(KV.of("hello", 100), KV.of("hello", 200)) > > In both cases the code seems to work as expected, judging by the calls > to println. > > Is this a problem with TestStream? Or should I not have KVs with the > same Key when using a Timer? > > Thanks, > Andrew > > >
Re: ParDo with Timer hangs when running under TestStream
Hi Andrew, This is because TestStream also controls processing time. You'll want to call #advanceProcessingTime [1] to move the clock forward. This example brings up a good best practice: When you use the stateful DoFn, you often want to set an event time timer for window expiration time (that's the end of the window + allowed lateness) to make sure to flush anything left in state. Kenn [1] https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/testing/TestStream.Builder.html#advanceProcessingTime-org.joda.time.Duration- On Mon, Dec 11, 2017 at 12:51 PM, Andrew Joneswrote: > Hi, > > I have a unit test using TestStream. It worked fine, until I added a > Timer to the pipeline I'm testing, and now it hangs after seemingly > finishing correctly. > > I've put together a minimal example at > https://github.com/andrewrjones/beam-test-stream- > timer/blob/master/src/test/java/com/andrewjones/beam/TimerTest.java. > I notice when I use the following, it hangs: > > .addElements(KV.of("hello", 100)) > .addElements(KV.of("hello", 200)) > > However, this seems to be fine: > > .addElements(KV.of("hello", 100), KV.of("hello", 200)) > > In both cases the code seems to work as expected, judging by the calls > to println. > > Is this a problem with TestStream? Or should I not have KVs with the > same Key when using a Timer? > > Thanks, > Andrew >