Hi Michal,

you need to implement a source which does not terminate. Take a look at the
InifiteSource [1] which does exactly this. That way there won't be a
Long.MAX_VALUE being sent when closing the source operator.

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java#L240

Cheers,
Till

On Fri, Aug 9, 2019 at 12:00 PM Michal Klempa <michal.kle...@gmail.com>
wrote:

> Hi guys,
> I have opposite issue :-) I would like to unit test negative behavior
> - that the Event Time timer is not fired when no further event arrives
> (which would advance the watermarks).
> But due to StreamSource firing Long.MAX_VALUE watermark after enclosed
> finite FromElementsFunction run method depletes, I was get all timers
> fired in tested operator (and ProcessFunction).
> Is anybody aware of a method, how to test or design Source Function
> prevent watermark advancing, so that the timers are not fired (which
> is desired behavior - to be guarded by unit tests).
> Thanks.
> Michal
>
> On Thu, May 9, 2019 at 9:02 AM Till Rohrmann <trohrm...@apache.org> wrote:
> >
> > Hi Steve,
> >
> > afaik there is no such thing in Flink. I agree that Flink's testing
> utilities should be improved. If you implement such a source, then you
> might be able to contribute it back to the community. That would be super
> helpful.
> >
> > Cheers,
> > Till
> >
> > On Wed, May 8, 2019 at 6:40 PM Steven Nelson <snel...@sourceallies.com>
> wrote:
> >>
> >>
> >> That’s what I figured was happening :( Your explanation is a lot better
> than what I gave to my team, so that will help a lot, thank you!
> >>
> >> Is there a testing source already created that does this sort of thing?
> The Flink-testing library seems a bit sparse.
> >>
> >> -Steve
> >>
> >> Sent from my iPhone
> >>
> >> On May 8, 2019, at 9:33 AM, Till Rohrmann <trohrm...@apache.org> wrote:
> >>
> >> Hi Steve,
> >>
> >> I think the reason for the different behaviour is due to the way event
> time and processing time are implemented.
> >>
> >> When you are using event time, watermarks need to travel through the
> topology denoting the current event time. When you source terminates, the
> system will send a watermark with Long.MAX_VALUE through the topology. This
> will effectively trigger the completion of all pending event time
> operations.
> >>
> >> In the case of processing time, Flink does not do this. Instead it
> simply relies on the processing time clocks on each machine. Hence, there
> is no way for Flink to tell the different machines that their respective
> processing time clocks should proceed to a certain time in case of a
> shutdown. Instead you should make sure that you don't terminate the job
> before a certain time (processing time) has passed. You could do this by
> adding a sleep to your source function after you've output all records and
> just before leaving the source loop.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, May 7, 2019 at 11:49 PM Steven Nelson <snel...@sourceallies.com>
> wrote:
> >>>
> >>> Hello!
> >>>
> >>> I am trying to write a test that runs in the TestEnviroment. I create
> a process that uses ProcessingTime, has a source constructed from a
> FromElementsFunction and runs data through a Keyed Stream into a
> ProcessingTimeSessionWindows.withGap().
> >>>
> >>> The problem is that it appears that the env.execute method returns
> immediately after the session closes, not allowing the events to be
> released from the window before shutdown occurs. This used to work when I
> used EventTime.
> >>>
> >>> Thoughts?
> >>> -Steve
>

Reply via email to