Thanks Kenn. I've tried adding calls to advanceProcessingTime[1], but
it doesn't seem to be helping. The test still hangs after processing
all the data.
Is this because I'm using the global window? So the window itself
doesn't ever get closed?
The point about the expiry timer is a good one, thanks.

Thanks,
Andrew

[1]: 
https://github.com/andrewrjones/beam-test-stream-timer/compare/advanceProcessingTime
On Tue, 12 Dec 2017, at 04:58, Kenneth Knowles wrote:
> Hi Andrew,
> 
> This is because TestStream also controls processing time. You'll want
> to call #advanceProcessingTime [1] to move the clock forward. This
> example brings up a good best practice: When you use the stateful
> DoFn, you often want to set an event time timer for window expiration
> time (that's the end of the window + allowed lateness) to make sure to
> flush anything left in state.> 
> Kenn
> 
> [1] 
> https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/testing/TestStream.Builder.html#advanceProcessingTime-org.joda.time.Duration->
>  
> On Mon, Dec 11, 2017 at 12:51 PM, Andrew Jones <andrew+beam@andrew-
> jones.com> wrote:>> Hi,
>> 
>>  I have a unit test using TestStream. It worked fine, until I added a>>  
>> Timer to the pipeline I'm testing, and now it hangs after seemingly>>  
>> finishing correctly.
>> 
>>  I've put together a minimal example at
>> https://github.com/andrewrjones/beam-test-stream-timer/blob/master/src/test/java/com/andrewjones/beam/TimerTest.java.>>
>>   I notice when I use the following, it hangs:
>> 
>>  .addElements(KV.of("hello", 100))
>>  .addElements(KV.of("hello", 200))
>> 
>>  However, this seems to be fine:
>> 
>>  .addElements(KV.of("hello", 100), KV.of("hello", 200))
>> 
>>  In both cases the code seems to work as expected, judging by
>>  the calls>>  to println.
>> 
>>  Is this a problem with TestStream? Or should I not have KVs with the>>  
>> same Key when using a Timer?
>> 
>>  Thanks,
>>  Andrew

Reply via email to