Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread 杨力
Thank you for your response. Registering a timer at Long.MaxValue works. And I have found the mistake in my original code. When a timer fires and there are elements in the priority queue with timestamp greater than current watermark, they do not get processed. A new timer should be registered for

Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread David Anderson
The reason why the watermark is not advancing is that assignAscendingTimestamps is a periodic watermark generator. This style of watermark generator is called at regular intervals to create watermarks -- by default, this is done every 200 msec. With only a tiny bit of data to process, the job

Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread Dian Fu
Before a program close, it will emit Long.MaxValue as the watermark and that watermark will trigger all the windows. This is the reason why your `timeWindow` program could work. However, for the first program, you have not registered the event time timer(though

Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread 杨力
It seems to be the case. But when I use timeWindow or CEP with fromCollection, it works well. For example, ``` sEnv.fromCollection(Seq[Long](1, 1002, 2002, 3002)).assignAscendingTimestamps(identity[Long]) .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print() ``` prints ``` 1 1002 2002

Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread Dian Fu
Hi, It generates watermark periodically by default in the underlying implementation of `assignAscendingTimestamps`. So for your test program, the watermark is still not generated yet and I think that's the reason why it's Long.MinValue. Regards, Dian > 在 2019年10月28日,上午11:59,杨力 写道: > >

Watermark won't advance in ProcessFunction

2019-10-27 Thread 杨力
I'm going to sort elements in a PriorityQueue and set up timers at (currentWatermark + 1), following the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing . However, it seems that