On Fri, Jan 17, 2020 at 11:44 AM Stephen Patel <stephenpate...@gmail.com>
wrote:

> * What is the space of keys? Is it constantly growing?
>
> The keys are effectively database row ids so they're always growing,
> or being updated if the rows are modified.  Each row is small.
>
> * Are you explicitly clearing out state from stale keys?
>
> No, once a key is entered, I keep the value for it until it's updated
> with a new value.  This is so that if the value changes, or we get an
> out of order version of that value, we can issue the appropriate
> changes/retractions.
>

In this case your state will continue to grow without bound. I don't know
about Flink's details. If checkpoints are incremental then maybe this is no
problem? But maybe it is.

Kenn


> I can share a small variant of the code that doesn't involve any
> statefulness (apart from the implicit and temporary statefulness of
> the cogroup/deduplication operator):
> https://gist.github.com/spatel11/ced0d175ca64962e0ec734a857d1ef33
>
> This code can create two types of pipelines.  One fills two sqs queues
> at different rates, the other reads from those queues, cogroups them,
> and generates some metrics.  The read pipeline will operate fine for.
>
> I've also included the flink-conf that I was using.
>
> Stephen
>
> On Fri, Jan 17, 2020 at 11:47 AM Kenneth Knowles <k...@apache.org> wrote:
> >
> > Starting with just checking some basic things: What is the space of
> keys? Is it constantly growing? Are you explicitly clearing out state from
> stale keys? In the global window, you don't get any state GC for free.
> >
> > Can you share repro code?
> >
> > Kenn
> >
> > On Fri, Jan 17, 2020 at 8:53 AM Stephen Patel <stephenpate...@gmail.com>
> wrote:
> >>
> >> I've got a beam pipeline using the FlinkRunner that reads from two
> >> different SQS sources (using the SqsIO).  It does some stateful
> >> processing on each stream, and then cogroups the results together to
> >> generate a result and write it to Sns (using the SnsIO).  The volume
> >> of input data isn't particularly large (about 50 messages per minute
> >> on one queue, and about 1 message per minute on the other queue).
> >> It's using the Global window, discarding fired panes, with a
> >> processing time trigger delayed by 1 minute.  Checkpointing is enabled
> >> at a 1 minute interval, with a minimum delay between checkpoints of 30
> >> seconds.  My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED
> >> predefined options.
> >>
> >> This pipeline runs fine for a few hours with an average checkpoint
> >> duration of 1s (with occasional spikes higher), but eventually the
> >> time it takes to checkpoint begins to grow until it's in the minutes
> >> on average, and finally it won't even complete within a 10 minute
> >> period.  I'm using 2 parallelism, and it seems to keep up with the
> >> number of incoming messages just fine (until the checkpoint duration
> >> grows too large and it is unable to delete the messages any longer).
> >> To try to isolate the problem, I wrote an alternate sqs reader that
> >> uses the Watch transform to periodically read from SQS.  This variant
> >> doesn't show the same behavior, and has been running for a week
> >> without issue (an average checkpoint time of 1-2s).
> >>
> >> Some other experiments I tried:
> >>
> >> * I observed that the operators that took a long time to checkpoint
> >> were the deduplicating operators after the actual unbounded source
> >> operator.  I disabled requiresDeduping and added a Reshuffle instead,
> >> however that exhibited the same growth in checkpoint durations after a
> >> period of time.
> >> * I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly
> >> once, however that also exhibited the same behavior.
> >>
> >>
> >> Does anyone have any thoughts about what might cause this behavior
> >> with the Unbounded Source (as opposed to the splittable do variant)?
> >>
> >> I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0.
> >>
> >> Stephen
>

Reply via email to