Hi Michal, you need to implement a source which does not terminate. Take a look at the InifiteSource [1] which does exactly this. That way there won't be a Long.MAX_VALUE being sent when closing the source operator.
[1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java#L240 Cheers, Till On Fri, Aug 9, 2019 at 12:00 PM Michal Klempa <michal.kle...@gmail.com> wrote: > Hi guys, > I have opposite issue :-) I would like to unit test negative behavior > - that the Event Time timer is not fired when no further event arrives > (which would advance the watermarks). > But due to StreamSource firing Long.MAX_VALUE watermark after enclosed > finite FromElementsFunction run method depletes, I was get all timers > fired in tested operator (and ProcessFunction). > Is anybody aware of a method, how to test or design Source Function > prevent watermark advancing, so that the timers are not fired (which > is desired behavior - to be guarded by unit tests). > Thanks. > Michal > > On Thu, May 9, 2019 at 9:02 AM Till Rohrmann <trohrm...@apache.org> wrote: > > > > Hi Steve, > > > > afaik there is no such thing in Flink. I agree that Flink's testing > utilities should be improved. If you implement such a source, then you > might be able to contribute it back to the community. That would be super > helpful. > > > > Cheers, > > Till > > > > On Wed, May 8, 2019 at 6:40 PM Steven Nelson <snel...@sourceallies.com> > wrote: > >> > >> > >> That’s what I figured was happening :( Your explanation is a lot better > than what I gave to my team, so that will help a lot, thank you! > >> > >> Is there a testing source already created that does this sort of thing? > The Flink-testing library seems a bit sparse. > >> > >> -Steve > >> > >> Sent from my iPhone > >> > >> On May 8, 2019, at 9:33 AM, Till Rohrmann <trohrm...@apache.org> wrote: > >> > >> Hi Steve, > >> > >> I think the reason for the different behaviour is due to the way event > time and processing time are implemented. > >> > >> When you are using event time, watermarks need to travel through the > topology denoting the current event time. When you source terminates, the > system will send a watermark with Long.MAX_VALUE through the topology. This > will effectively trigger the completion of all pending event time > operations. > >> > >> In the case of processing time, Flink does not do this. Instead it > simply relies on the processing time clocks on each machine. Hence, there > is no way for Flink to tell the different machines that their respective > processing time clocks should proceed to a certain time in case of a > shutdown. Instead you should make sure that you don't terminate the job > before a certain time (processing time) has passed. You could do this by > adding a sleep to your source function after you've output all records and > just before leaving the source loop. > >> > >> Cheers, > >> Till > >> > >> On Tue, May 7, 2019 at 11:49 PM Steven Nelson <snel...@sourceallies.com> > wrote: > >>> > >>> Hello! > >>> > >>> I am trying to write a test that runs in the TestEnviroment. I create > a process that uses ProcessingTime, has a source constructed from a > FromElementsFunction and runs data through a Keyed Stream into a > ProcessingTimeSessionWindows.withGap(). > >>> > >>> The problem is that it appears that the env.execute method returns > immediately after the session closes, not allowing the events to be > released from the window before shutdown occurs. This used to work when I > used EventTime. > >>> > >>> Thoughts? > >>> -Steve >