On Fri, Jan 17, 2020 at 11:44 AM Stephen Patel <stephenpate...@gmail.com> wrote:
> * What is the space of keys? Is it constantly growing? > > The keys are effectively database row ids so they're always growing, > or being updated if the rows are modified. Each row is small. > > * Are you explicitly clearing out state from stale keys? > > No, once a key is entered, I keep the value for it until it's updated > with a new value. This is so that if the value changes, or we get an > out of order version of that value, we can issue the appropriate > changes/retractions. > In this case your state will continue to grow without bound. I don't know about Flink's details. If checkpoints are incremental then maybe this is no problem? But maybe it is. Kenn > I can share a small variant of the code that doesn't involve any > statefulness (apart from the implicit and temporary statefulness of > the cogroup/deduplication operator): > https://gist.github.com/spatel11/ced0d175ca64962e0ec734a857d1ef33 > > This code can create two types of pipelines. One fills two sqs queues > at different rates, the other reads from those queues, cogroups them, > and generates some metrics. The read pipeline will operate fine for. > > I've also included the flink-conf that I was using. > > Stephen > > On Fri, Jan 17, 2020 at 11:47 AM Kenneth Knowles <k...@apache.org> wrote: > > > > Starting with just checking some basic things: What is the space of > keys? Is it constantly growing? Are you explicitly clearing out state from > stale keys? In the global window, you don't get any state GC for free. > > > > Can you share repro code? > > > > Kenn > > > > On Fri, Jan 17, 2020 at 8:53 AM Stephen Patel <stephenpate...@gmail.com> > wrote: > >> > >> I've got a beam pipeline using the FlinkRunner that reads from two > >> different SQS sources (using the SqsIO). It does some stateful > >> processing on each stream, and then cogroups the results together to > >> generate a result and write it to Sns (using the SnsIO). The volume > >> of input data isn't particularly large (about 50 messages per minute > >> on one queue, and about 1 message per minute on the other queue). > >> It's using the Global window, discarding fired panes, with a > >> processing time trigger delayed by 1 minute. Checkpointing is enabled > >> at a 1 minute interval, with a minimum delay between checkpoints of 30 > >> seconds. My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED > >> predefined options. > >> > >> This pipeline runs fine for a few hours with an average checkpoint > >> duration of 1s (with occasional spikes higher), but eventually the > >> time it takes to checkpoint begins to grow until it's in the minutes > >> on average, and finally it won't even complete within a 10 minute > >> period. I'm using 2 parallelism, and it seems to keep up with the > >> number of incoming messages just fine (until the checkpoint duration > >> grows too large and it is unable to delete the messages any longer). > >> To try to isolate the problem, I wrote an alternate sqs reader that > >> uses the Watch transform to periodically read from SQS. This variant > >> doesn't show the same behavior, and has been running for a week > >> without issue (an average checkpoint time of 1-2s). > >> > >> Some other experiments I tried: > >> > >> * I observed that the operators that took a long time to checkpoint > >> were the deduplicating operators after the actual unbounded source > >> operator. I disabled requiresDeduping and added a Reshuffle instead, > >> however that exhibited the same growth in checkpoint durations after a > >> period of time. > >> * I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly > >> once, however that also exhibited the same behavior. > >> > >> > >> Does anyone have any thoughts about what might cause this behavior > >> with the Unbounded Source (as opposed to the splittable do variant)? > >> > >> I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0. > >> > >> Stephen >