Hi guys, I have opposite issue :-) I would like to unit test negative behavior - that the Event Time timer is not fired when no further event arrives (which would advance the watermarks). But due to StreamSource firing Long.MAX_VALUE watermark after enclosed finite FromElementsFunction run method depletes, I was get all timers fired in tested operator (and ProcessFunction). Is anybody aware of a method, how to test or design Source Function prevent watermark advancing, so that the timers are not fired (which is desired behavior - to be guarded by unit tests). Thanks. Michal
On Thu, May 9, 2019 at 9:02 AM Till Rohrmann <trohrm...@apache.org> wrote: > > Hi Steve, > > afaik there is no such thing in Flink. I agree that Flink's testing utilities > should be improved. If you implement such a source, then you might be able to > contribute it back to the community. That would be super helpful. > > Cheers, > Till > > On Wed, May 8, 2019 at 6:40 PM Steven Nelson <snel...@sourceallies.com> wrote: >> >> >> That’s what I figured was happening :( Your explanation is a lot better than >> what I gave to my team, so that will help a lot, thank you! >> >> Is there a testing source already created that does this sort of thing? The >> Flink-testing library seems a bit sparse. >> >> -Steve >> >> Sent from my iPhone >> >> On May 8, 2019, at 9:33 AM, Till Rohrmann <trohrm...@apache.org> wrote: >> >> Hi Steve, >> >> I think the reason for the different behaviour is due to the way event time >> and processing time are implemented. >> >> When you are using event time, watermarks need to travel through the >> topology denoting the current event time. When you source terminates, the >> system will send a watermark with Long.MAX_VALUE through the topology. This >> will effectively trigger the completion of all pending event time operations. >> >> In the case of processing time, Flink does not do this. Instead it simply >> relies on the processing time clocks on each machine. Hence, there is no way >> for Flink to tell the different machines that their respective processing >> time clocks should proceed to a certain time in case of a shutdown. Instead >> you should make sure that you don't terminate the job before a certain time >> (processing time) has passed. You could do this by adding a sleep to your >> source function after you've output all records and just before leaving the >> source loop. >> >> Cheers, >> Till >> >> On Tue, May 7, 2019 at 11:49 PM Steven Nelson <snel...@sourceallies.com> >> wrote: >>> >>> Hello! >>> >>> I am trying to write a test that runs in the TestEnviroment. I create a >>> process that uses ProcessingTime, has a source constructed from a >>> FromElementsFunction and runs data through a Keyed Stream into a >>> ProcessingTimeSessionWindows.withGap(). >>> >>> The problem is that it appears that the env.execute method returns >>> immediately after the session closes, not allowing the events to be >>> released from the window before shutdown occurs. This used to work when I >>> used EventTime. >>> >>> Thoughts? >>> -Steve