Hi Gregory, What's the cause of your problem. It would be great if you can share your experience which I think will definitely help others.
On Thu, Jun 28, 2018 at 11:30 AM, Gregory Fee <g...@lyft.com> wrote: > Yep, it was definitely a watermarking issue. I have that sorted out now. > Thanks! > > On Wed, Jun 27, 2018 at 6:49 PM, Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi Gregory, >> >> As you are using the rowtime over window. It is probably a watermark >> problem. The window only output when watermarks make a progress. You can >> use processing-time(instead of row-time) to verify the assumption. Also, >> make sure there are data in each of you source partition, the watermarks >> make no progress if one of the source partition has no data. An operator’s >> current event time is the minimum of its input streams’ event times[1]. >> >> Best, Hequn >> >> [1]: https://ci.apache.org/projects/flink/flink-docs-master/ >> dev/event_time.html >> >> On Thu, Jun 28, 2018 at 1:58 AM, Gregory Fee <g...@lyft.com> wrote: >> >>> Thanks for your answers! Yes, it was based on watermarks. >>> >>> Fabian, the state does indeed grow quite a bit in my scenario. I've >>> observed in the range of 5GB. That doesn't seem to be an issue in itself. >>> However, in my scenario I'm loading a lot of data from a historic store >>> that is only partitioned by day. As such a full day's worth of data is >>> loaded into the system before the watermark advances. At that point the >>> checkpoints stall indefinitely with a couple of the tasks in the 'over' >>> operator never acknowledging. Any thoughts on what would cause that? Or how >>> to address it? >>> >>> On Wed, Jun 27, 2018 at 2:20 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> The OVER window operator can only emit result when the watermark is >>>> advanced, due to SQL semantics which define that all records with the same >>>> timestamp need to be processed together. >>>> Can you check if the watermarks make sufficient progress? >>>> >>>> Btw. did you observe state size or IO issues? The OVER window operator >>>> also needs to store the whole window interval in state, i.e., 14 days in >>>> your case, in order to be able to retract the data from the aggregates >>>> after 14 days. >>>> Everytime the watermark moves, the operator iterates over all >>>> timestamps (per key) to check which records need to be removed. >>>> >>>> Best, Fabian >>>> >>>> 2018-06-27 5:38 GMT+02:00 Rong Rong <walter...@gmail.com>: >>>> >>>>> Hi Greg. >>>>> >>>>> Based on a quick test I cannot reproduce the issue, it is emitting >>>>> messages correctly in the ITCase environment. >>>>> can you share more information? Does the same problem happen if you >>>>> use proctime? >>>>> I am guessing this could be highly correlated with how you set your >>>>> watermark strategy of your input streams of "user_things" and >>>>> "user_stuff". >>>>> >>>>> -- >>>>> Rong >>>>> >>>>> On Tue, Jun 26, 2018 at 6:37 PM Gregory Fee <g...@lyft.com> wrote: >>>>> >>>>>> Hello User Community! >>>>>> >>>>>> I am running some streaming SQL that involves a union all into an >>>>>> over window similar to the below: >>>>>> >>>>>> SELECT user_id, count(action) OVER (PARTITION BY user_id ORDER BY >>>>>> rowtime RANGE INTERVAL '14' DAY PRECEDING) action_count, rowtime >>>>>> FROM >>>>>> (SELECT rowtime, user_id, thing as action FROM user_things >>>>>> UNION ALL SELECT rowtime, user_id, stuff as action FROM >>>>>> user_stuff) >>>>>> >>>>>> The SQL generates three operators. There are two operators that >>>>>> process the 'from' part of the clause that feed into an 'over' operator. >>>>>> I >>>>>> notice that messages flow into the 'over' operator and just buffer there >>>>>> for a long time (hours in some cases). Eventually something happens and >>>>>> the >>>>>> data starts to flush through to the downstream operators. Can anyone help >>>>>> me understand what is causing that behavior? I want the data to flow >>>>>> through more consistently. >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> *Gregory Fee* >>>>>> Engineer >>>>>> 425.830.4734 <+14258304734> >>>>>> [image: Lyft] <http://www.lyft.com> >>>>>> >>>>> >>>> >>> >>> >>> -- >>> *Gregory Fee* >>> Engineer >>> 425.830.4734 <+14258304734> >>> [image: Lyft] <http://www.lyft.com> >>> >> >> > > > -- > *Gregory Fee* > Engineer > 425.830.4734 <+14258304734> > [image: Lyft] <http://www.lyft.com> >