Hi guys,
I have opposite issue :-) I would like to unit test negative behavior
- that the Event Time timer is not fired when no further event arrives
(which would advance the watermarks).
But due to StreamSource firing Long.MAX_VALUE watermark after enclosed
finite FromElementsFunction run method depletes, I was get all timers
fired in tested operator (and ProcessFunction).
Is anybody aware of a method, how to test or design Source Function
prevent watermark advancing, so that the timers are not fired (which
is desired behavior - to be guarded by unit tests).
Thanks.
Michal

On Thu, May 9, 2019 at 9:02 AM Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Steve,
>
> afaik there is no such thing in Flink. I agree that Flink's testing utilities 
> should be improved. If you implement such a source, then you might be able to 
> contribute it back to the community. That would be super helpful.
>
> Cheers,
> Till
>
> On Wed, May 8, 2019 at 6:40 PM Steven Nelson <snel...@sourceallies.com> wrote:
>>
>>
>> That’s what I figured was happening :( Your explanation is a lot better than 
>> what I gave to my team, so that will help a lot, thank you!
>>
>> Is there a testing source already created that does this sort of thing? The 
>> Flink-testing library seems a bit sparse.
>>
>> -Steve
>>
>> Sent from my iPhone
>>
>> On May 8, 2019, at 9:33 AM, Till Rohrmann <trohrm...@apache.org> wrote:
>>
>> Hi Steve,
>>
>> I think the reason for the different behaviour is due to the way event time 
>> and processing time are implemented.
>>
>> When you are using event time, watermarks need to travel through the 
>> topology denoting the current event time. When you source terminates, the 
>> system will send a watermark with Long.MAX_VALUE through the topology. This 
>> will effectively trigger the completion of all pending event time operations.
>>
>> In the case of processing time, Flink does not do this. Instead it simply 
>> relies on the processing time clocks on each machine. Hence, there is no way 
>> for Flink to tell the different machines that their respective processing 
>> time clocks should proceed to a certain time in case of a shutdown. Instead 
>> you should make sure that you don't terminate the job before a certain time 
>> (processing time) has passed. You could do this by adding a sleep to your 
>> source function after you've output all records and just before leaving the 
>> source loop.
>>
>> Cheers,
>> Till
>>
>> On Tue, May 7, 2019 at 11:49 PM Steven Nelson <snel...@sourceallies.com> 
>> wrote:
>>>
>>> Hello!
>>>
>>> I am trying to write a test that runs in the TestEnviroment. I create a 
>>> process that uses ProcessingTime, has a source constructed from a 
>>> FromElementsFunction and runs data through a Keyed Stream into a 
>>> ProcessingTimeSessionWindows.withGap().
>>>
>>> The problem is that it appears that the env.execute method returns 
>>> immediately after the session closes, not allowing the events to be 
>>> released from the window before shutdown occurs. This used to work when I 
>>> used EventTime.
>>>
>>> Thoughts?
>>> -Steve

Reply via email to