Hi Eron, Thanks for your help. Actually I know maxoutoforder, lateness is based on Event Time. But in my test it is not. Following is my code and test data. "key1|1483250640000|", "key1|1483250636000|", "key1|1483250649000|", "key1|1483250642000|", "key1|1483250650000|", "key1|1483250641000|", "key1|1483250653000|", "key1|1483250648000|", "key1|1483250645000|", "key1|1483250658000|", "key1|1483250647000|", "key1|1483250643000|", "key1|1483250661000|", "key1|1483250662000|", "key1|1483250667000|", "key1|1483250663000|",
dataStream.keyBy(row -> (String)row.getField(0)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(5)) .fold(initRow(), new FoldFunction<Row, Row>() { @Override public Row fold(Row ret, Row o) throws Exception { ret.setField(0, (int)ret.getField(0) + 1); ret.setField(1, (String)ret.getField(1)+ o.getField(1) + "|"); return ret; } }) 1. Send Data *WITHOUT*Thread.sleep(), the result is like this : 1,1483250636000| 4,1483250640000|1483250642000|1483250641000|1483250643000| 4,1483250649000|1483250648000|1483250645000|1483250647000| 2,1483250650000|1483250653000| 1,1483250658000| 3,1483250661000|1483250662000|1483250663000| 1,1483250667000| 2. Send Data WITH Thread.sleep(), the result is like this, we will see the function of allowedLateness, it will trigger the window to calculate again, the result will come out again. 1,1483250636000| 1,1483250640000| 2,1483250640000|1483250642000| 1,1483250649000| 2,1483250649000|1483250648000| 3,1483250649000|1483250648000|1483250645000| 2,1483250650000|1483250653000| 1,1483250658000| 2,1483250661000|1483250662000| 3,1483250661000|1483250662000|1483250663000| 1,1483250667000| -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/