Hi,
In a datastream processing problem, the source generated data every 8
millisecond and timestamp is a field of the data. In default Flink time
behavior data enter the time window but when I set Flink time to EventTime
it will output nothing! Here is the code:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res =
aggregatedTuple
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long,
JSONObject>>(Time.milliseconds(8)) {
@Override
public long extractTimestamp(Tuple3<String, Long,
JSONObject> element) {
return element.f1 ;
}
}).keyBy(1).timeWindow(Time.milliseconds(8))
.allowedLateness(Time.milliseconds(3))
.sideOutputLateData(lateOutputTag)
.reduce(processing...);
DataStream<Tuple3<String, Long, JSONObject>> lateData =
res.getSideOutput(lateOutputTag);
res.print();
What is the problem with my code?
According to the Flink documents, my understanding about EventTime is that
for example in case of time window when a new data received it start a new
(logical window) based on new data event timestamp and wait 8 milliseconds
(according to my code) to see if any other data with the same key received
or not and after 8 millisecond (from timestamp of the first element of the
window) it will be triggered. Since data source generated data in a
constant periodic interval, I set a watermarck of 8, too. Is my
understanding about Flink window in EventTime correct?