Hi Hung,
I see one thing that could explain the problem, the timestamp assigner should
look like this:
new AssignerWithPeriodicWatermarks<BizEvent>() {
long curTimeStamp;
@Override
public long extractTimestamp(BizEvent biz, long
currentTimestamp) {
curTimeStamp = Math.max(curTimeStamp,
biz.time.getMillis());
return biz.time.getMillis();
}
@Override
public long getCurrentWatermark() {
return (curTimeStamp - (maxEventDelay * 1000));
}
}
The currentTimestamp parameter is the internal timestamp that the element had
before, which is most likely just “-1” because no timestamp was previously
assigned.
Does it work with that fix?
Cheers,
Aljoscha
> On 25 Feb 2016, at 17:26, HungChang <[email protected]> wrote:
>
> An update. The following situation works as expected. The data arrives after
> Flink job starts to execute.
> 1> (2016-02-25T17:46:25.00,13)
> 2> (2016-02-25T17:46:40.00,16)
> 3> (2016-02-25T17:46:50.00,11)
> 4> (2016-02-25T17:47:10.00,12)
>
> But for the data arrives long time before. Strange behavior appears. Does it
> mean we cannot reply the computation?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5156.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at
> Nabble.com.