Hequn,

Thanks for the help. It is indeed a watermark problem. From Flink UI, I can
see the low watermark value for each operator. And the groupBy operator has
lagged value of watermark. I checked the link from SO and confirmed that:
1. I do see record coming in for this operator
2. I have parallelism = 32 and only one task has the record. Can you please
elaborate more on why this would affect the watermark advancement?
3. Event create time is in ms
4. data span time > window time. I don't quite understand why this matters.

Thanks,
Fanbin

On Tue, Jul 23, 2019 at 7:17 PM Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi Fanbin,
>
> Fabian is right, it should be a watermark problem. Probably, some tasks of
> the source don't have enough data to advance the watermark. Furthermore,
> you could also monitor event time through Flink web interface.
> I have answered a similar question on stackoverflow, see more details
> here[1].
>
> Best, Hequn
>
> [1]
> https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger
>
> On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu <fanbin...@coinbase.com> wrote:
>
>> If I use proctime, the groupBy happens without any delay.
>>
>> On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <fanbin...@coinbase.com>
>> wrote:
>>
>>> not sure whether this is related:
>>>
>>> public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
>>>       AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
>>>
>>>    // match parallelism to input, otherwise dop=1 sources could lead to 
>>> some strange
>>>    // behaviour: the watermark will creep along very slowly because the 
>>> elements
>>>    // from the source go to each extraction operator round robin.
>>>    final int inputParallelism = getTransformation().getParallelism();
>>>    final AssignerWithPeriodicWatermarks<T> cleanedAssigner = 
>>> clean(timestampAndWatermarkAssigner);
>>>
>>>    TimestampsAndPeriodicWatermarksOperator<T> operator =
>>>          new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
>>>
>>>    return transform("Timestamps/Watermarks", 
>>> getTransformation().getOutputType(), operator)
>>>          .setParallelism(inputParallelism);
>>> }
>>>
>>> parallelism is set to 32
>>>
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>
>>> env.setParallelism(32)
>>>
>>> and the command for launching the job is
>>>
>>> flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS
>>>
>>>
>>>
>>>
>>> On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <fanbin...@coinbase.com>
>>> wrote:
>>>
>>>> Thanks Fabian for the prompt reply. I just started using Flink and this
>>>> is a great community.
>>>> The watermark setting is only accounting for 10 sec delay. Besides
>>>> that, the local job on IntelliJ is running fine without issues.
>>>>
>>>> Here is the code:
>>>>
>>>> class EventTimestampExtractor(slack: Long = 0L) extends 
>>>> AssignerWithPeriodicWatermarks[T] {
>>>>
>>>>   var currentMaxTimestamp: Long = _
>>>>
>>>>   override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
>>>>     val elemTs = e.created_at
>>>>     currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
>>>>     elemTs
>>>>   }
>>>>
>>>>   override def getCurrentWatermark(): Watermark = {
>>>>       new Watermark(currentMaxTimestamp)
>>>>   }
>>>> }
>>>>
>>>> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))
>>>>
>>>> Are there any other things I should be aware of?
>>>>
>>>> Thanks again for you kind help!
>>>>
>>>> Fanbin
>>>>
>>>>
>>>> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Fanbin,
>>>>>
>>>>> The delay is most likely caused by the watermark delay.
>>>>> A window is computed when the watermark passes the end of the window.
>>>>> If you configured the watermark to be 10 minutes before the current max
>>>>> timestamp (probably to account for out of order data), then the window 
>>>>> will
>>>>> be computed with approx. 10 minute delay.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
>>>>> fanbin...@coinbase.com>:
>>>>>
>>>>>> Hi,
>>>>>> I have a Flink sql streaming job defined by:
>>>>>>
>>>>>> SELECT
>>>>>>   user_id
>>>>>>   , hop_end(created_at, interval '30' second, interval '1' minute) as 
>>>>>> bucket_ts
>>>>>>   , count(name) as count
>>>>>> FROM event
>>>>>> WHERE name = 'signin'
>>>>>> GROUP BY
>>>>>>   user_id
>>>>>>   , hop(created_at, interval '30' second, interval '1' minute)
>>>>>>
>>>>>>
>>>>>> there is a noticeably delay of the groupBy operator. For example, I
>>>>>> only see the record sent out 10 min later after the record received in. 
>>>>>> see
>>>>>> the attached pic.
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> I m expecting to see the group by result after one minute since the
>>>>>> sliding window size is 1 min and the slide is 30 sec.
>>>>>>
>>>>>> There is no such issue if I run the job locally in IntelliJ. However,
>>>>>> I ran into the above issue if I run the job on EMR (flink version = 1.7)
>>>>>>
>>>>>> Can anybody give a clue of what could be wrong?
>>>>>> Thanks,
>>>>>>
>>>>>> Fanbin
>>>>>>
>>>>>

Reply via email to