Hi Soheil, Hequn is right. This might be an issue with advancing event-time. You can monitor that by checking the watermarks in the web dashboard or print-debug it with a ProcessFunction which can lookup the current watermark.
Best, Fabian 2018-07-19 3:30 GMT+02:00 Hequn Cheng <[email protected]>: > Hi Soheil, > > > wait 8 milliseconds (according to my code) to see if any other data > with the same key received or not and after 8 millisecond it will be > triggered. > Yes, but the time is event time, so if there is no data from source the > time won't be advanced. > > There are some reasons why the event time has not been advanced: > 1. There are no data from the source > 2. One of the source parallelisms doesn't have data > 3. The time field, i.e, Long in Tuple3, should be millisecond instead of > second. > 4. Data should cover a longer time spam than the window size to advance > the event time. > > Best, Hequn > > On Wed, Jul 18, 2018 at 3:53 PM, Soheil Pourbafrani <[email protected] > > wrote: > >> Hi, >> >> In a datastream processing problem, the source generated data every 8 >> millisecond and timestamp is a field of the data. In default Flink time >> behavior data enter the time window but when I set Flink time to EventTime >> it will output nothing! Here is the code: >> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res = >> aggregatedTuple >> .assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, >> JSONObject>>(Time.milliseconds(8)) { >> >> @Override >> public long extractTimestamp(Tuple3<String, Long, JSONObject> >> element) { >> return element.f1 ; >> } >> }).keyBy(1).timeWindow(Time.milliseconds(8)) >> .allowedLateness(Time.milliseconds(3)) >> .sideOutputLateData(lateOutputTag) >> .reduce(processing...); >> DataStream<Tuple3<String, Long, JSONObject>> lateData = >> res.getSideOutput(lateOutputTag); >> res.print(); >> >> What is the problem with my code? >> According to the Flink documents, my understanding about EventTime is >> that for example in case of time window when a new data received it start a >> new (logical window) based on new data event timestamp and wait 8 >> milliseconds (according to my code) to see if any other data with the same >> key received or not and after 8 millisecond (from timestamp of the first >> element of the window) it will be triggered. Since data source generated >> data in a constant periodic interval, I set a watermarck of 8, too. Is my >> understanding about Flink window in EventTime correct? >> > >
