Hi Soheil, You can monitor the watermarks in the web dashboard as Fabian said. There are some documents here[1].
[1] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time On Thu, Jul 19, 2018 at 3:53 PM, Fabian Hueske <[email protected]> wrote: > Hi Soheil, > > Hequn is right. This might be an issue with advancing event-time. > You can monitor that by checking the watermarks in the web dashboard or > print-debug it with a ProcessFunction which can lookup the current > watermark. > > Best, Fabian > > 2018-07-19 3:30 GMT+02:00 Hequn Cheng <[email protected]>: > >> Hi Soheil, >> >> > wait 8 milliseconds (according to my code) to see if any other data >> with the same key received or not and after 8 millisecond it will be >> triggered. >> Yes, but the time is event time, so if there is no data from source the >> time won't be advanced. >> >> There are some reasons why the event time has not been advanced: >> 1. There are no data from the source >> 2. One of the source parallelisms doesn't have data >> 3. The time field, i.e, Long in Tuple3, should be millisecond instead of >> second. >> 4. Data should cover a longer time spam than the window size to advance >> the event time. >> >> Best, Hequn >> >> On Wed, Jul 18, 2018 at 3:53 PM, Soheil Pourbafrani < >> [email protected]> wrote: >> >>> Hi, >>> >>> In a datastream processing problem, the source generated data every 8 >>> millisecond and timestamp is a field of the data. In default Flink time >>> behavior data enter the time window but when I set Flink time to EventTime >>> it will output nothing! Here is the code: >>> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res = >>> aggregatedTuple >>> .assignTimestampsAndWatermarks(new >>> BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, >>> JSONObject>>(Time.milliseconds(8)) { >>> >>> @Override >>> public long extractTimestamp(Tuple3<String, Long, JSONObject> >>> element) { >>> return element.f1 ; >>> } >>> }).keyBy(1).timeWindow(Time.milliseconds(8)) >>> .allowedLateness(Time.milliseconds(3)) >>> .sideOutputLateData(lateOutputTag) >>> .reduce(processing...); >>> DataStream<Tuple3<String, Long, JSONObject>> lateData = >>> res.getSideOutput(lateOutputTag); >>> res.print(); >>> >>> What is the problem with my code? >>> According to the Flink documents, my understanding about EventTime is >>> that for example in case of time window when a new data received it start a >>> new (logical window) based on new data event timestamp and wait 8 >>> milliseconds (according to my code) to see if any other data with the same >>> key received or not and after 8 millisecond (from timestamp of the first >>> element of the window) it will be triggered. Since data source generated >>> data in a constant periodic interval, I set a watermarck of 8, too. Is my >>> understanding about Flink window in EventTime correct? >>> >> >> >
