Thanks Kenn. I've tried adding calls to advanceProcessingTime[1], but it doesn't seem to be helping. The test still hangs after processing all the data. Is this because I'm using the global window? So the window itself doesn't ever get closed? The point about the expiry timer is a good one, thanks.
Thanks, Andrew [1]: https://github.com/andrewrjones/beam-test-stream-timer/compare/advanceProcessingTime On Tue, 12 Dec 2017, at 04:58, Kenneth Knowles wrote: > Hi Andrew, > > This is because TestStream also controls processing time. You'll want > to call #advanceProcessingTime [1] to move the clock forward. This > example brings up a good best practice: When you use the stateful > DoFn, you often want to set an event time timer for window expiration > time (that's the end of the window + allowed lateness) to make sure to > flush anything left in state.> > Kenn > > [1] > https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/testing/TestStream.Builder.html#advanceProcessingTime-org.joda.time.Duration-> > > On Mon, Dec 11, 2017 at 12:51 PM, Andrew Jones <andrew+beam@andrew- > jones.com> wrote:>> Hi, >> >> I have a unit test using TestStream. It worked fine, until I added a>> >> Timer to the pipeline I'm testing, and now it hangs after seemingly>> >> finishing correctly. >> >> I've put together a minimal example at >> https://github.com/andrewrjones/beam-test-stream-timer/blob/master/src/test/java/com/andrewjones/beam/TimerTest.java.>> >> I notice when I use the following, it hangs: >> >> .addElements(KV.of("hello", 100)) >> .addElements(KV.of("hello", 200)) >> >> However, this seems to be fine: >> >> .addElements(KV.of("hello", 100), KV.of("hello", 200)) >> >> In both cases the code seems to work as expected, judging by >> the calls>> to println. >> >> Is this a problem with TestStream? Or should I not have KVs with the>> >> same Key when using a Timer? >> >> Thanks, >> Andrew
