Hi Vinod,

This sounds like a watermark issue to me.
The commonly used watermark strategies (like bounded out-of-order) are only
advancing when there is a new record.
Moreover, the current watermark is the minimum of the current watermarks of
all input partitions.
So, the watermark only moves forward if the watermark of the "most-behind"
partition advances.
If you have many parallel partitions and only very few records every hour,
it might take a long time until "the right" partition processes a new
record and hence advances its watermark.

My recommendation would be to ensure that you have only one source that
reads the records and assigns watermarks (maybe even keep the parallelism
of the whole query to 1 if possible).
Moreover, you might want to think about a more aggressive watermarking
strategy that advances even if there is no data received based on
processing time.

Best, Fabian

Am So., 25. Aug. 2019 um 20:51 Uhr schrieb Vinod Mehra <vme...@lyft.com>:

> [image: image.png]
>
> When there are new events the old events just get stuck for many hours
> (more than a day). So if there is a buffering going on it seems it is not
> time based but size based (?). Looks like unless the buffered events exceed
> a certain threshold they don't get flushed out (?). Is that what is going
> on? Can someone confirm? Is there a way to flush out periodically?
>
> Thanks,
> Vinod
>
> On Fri, Aug 23, 2019 at 10:37 PM Vinod Mehra <vme...@lyft.com> wrote:
>
>> Although things improved during bootstrapping and when even volume was
>> larger. As soon as the traffic slowed down the events are getting stuck
>> (buffered?) at the OVER operator for a very long time. Several hours.
>>
>> On Fri, Aug 23, 2019 at 5:04 PM Vinod Mehra <vme...@lyft.com> wrote:
>>
>>> (Forgot to mention that we are using Flink 1.4)
>>>
>>> Update: Earlier the OVER operator was assigned a parallelism of 64. I
>>> reduced it to 1 and the problem went away! Now the OVER operator is not
>>> filtering/buffering the events anymore.
>>>
>>> Can someone explain this please?
>>>
>>> Thanks,
>>> Vinod
>>>
>>> On Fri, Aug 23, 2019 at 3:09 PM Vinod Mehra <vme...@lyft.com> wrote:
>>>
>>>> We have a SQL based flink job which is consume a very low volume stream
>>>> (1 or 2 events in few hours):
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *SELECT user_id,    COUNT(*) OVER (PARTITION BY user_id ORDER BY
>>>> rowtime RANGE INTERVAL '30' DAY PRECEDING) as count_30_days,
>>>> COALESCE(occurred_at, logged_at) AS latency_marker,    rowtimeFROM
>>>> event_fooWHERE user_id IS NOT NULL*
>>>>
>>>> The OVER operator seems to filter out events as per the flink dashboard
>>>> (records received = <non-zero-number> records sent = 0)
>>>>
>>>> The operator looks like this:
>>>>
>>>> *over: (PARTITION BY: $1, ORDER BY: rowtime, RANGEBETWEEN 2592000000
>>>> PRECEDING AND CURRENT ROW, select: (rowtime, $1, $2, COUNT(*) AS w0$o0)) ->
>>>> select: ($1 AS user_id, w0$o0 AS count_30_days, $2 AS latency_marker,
>>>> rowtime) -> to: Tuple2 -> Filter -> group_counter_count_30d.1.sqlRecords ->
>>>> sample_without_formatter*
>>>>
>>>> I know that the OVER operator can discard late arriving events, but
>>>> these events are not arriving late for sure. The watermark for all
>>>> operators stay at 0 because the output events is 0.
>>>>
>>>> We have an exactly same SQL job against a high volume stream that is
>>>> working fine. Watermarks progress in timely manner and events are delivered
>>>> in timely manner as well.
>>>>
>>>> Any idea what could be going wrong? Are the events getting buffered
>>>> waiting for certain number of events? If so, what is the threshold?
>>>>
>>>> Thanks,
>>>> Vinod
>>>>
>>>

Reply via email to