The rocksdb state backend for Flink offers incremental checkpointing (which I have configured). However I don't think the state itself is a problem. The example I posted earlier has no usage of state except for the cogroupbykey, which should be cleared on each firing, and only a constant number of keys (1). So we shouldn't have any growing state at all in the example. If the state itself (or my usage of it) was the problem, then I wouldn't think that my Watch based sqs source would change the equation, yet the Watch based source has been stable for over a week now.
Stephen On Fri, Jan 17, 2020 at 3:16 PM Kenneth Knowles <[email protected]> wrote: > > > > On Fri, Jan 17, 2020 at 11:44 AM Stephen Patel <[email protected]> > wrote: >> >> * What is the space of keys? Is it constantly growing? >> >> The keys are effectively database row ids so they're always growing, >> or being updated if the rows are modified. Each row is small. >> >> * Are you explicitly clearing out state from stale keys? >> >> No, once a key is entered, I keep the value for it until it's updated >> with a new value. This is so that if the value changes, or we get an >> out of order version of that value, we can issue the appropriate >> changes/retractions. > > > In this case your state will continue to grow without bound. I don't know > about Flink's details. If checkpoints are incremental then maybe this is no > problem? But maybe it is. > > Kenn > >> >> I can share a small variant of the code that doesn't involve any >> statefulness (apart from the implicit and temporary statefulness of >> the cogroup/deduplication operator): >> https://gist.github.com/spatel11/ced0d175ca64962e0ec734a857d1ef33 >> >> This code can create two types of pipelines. One fills two sqs queues >> at different rates, the other reads from those queues, cogroups them, >> and generates some metrics. The read pipeline will operate fine for. >> >> I've also included the flink-conf that I was using. >> >> Stephen >> >> On Fri, Jan 17, 2020 at 11:47 AM Kenneth Knowles <[email protected]> wrote: >> > >> > Starting with just checking some basic things: What is the space of keys? >> > Is it constantly growing? Are you explicitly clearing out state from stale >> > keys? In the global window, you don't get any state GC for free. >> > >> > Can you share repro code? >> > >> > Kenn >> > >> > On Fri, Jan 17, 2020 at 8:53 AM Stephen Patel <[email protected]> >> > wrote: >> >> >> >> I've got a beam pipeline using the FlinkRunner that reads from two >> >> different SQS sources (using the SqsIO). It does some stateful >> >> processing on each stream, and then cogroups the results together to >> >> generate a result and write it to Sns (using the SnsIO). The volume >> >> of input data isn't particularly large (about 50 messages per minute >> >> on one queue, and about 1 message per minute on the other queue). >> >> It's using the Global window, discarding fired panes, with a >> >> processing time trigger delayed by 1 minute. Checkpointing is enabled >> >> at a 1 minute interval, with a minimum delay between checkpoints of 30 >> >> seconds. My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED >> >> predefined options. >> >> >> >> This pipeline runs fine for a few hours with an average checkpoint >> >> duration of 1s (with occasional spikes higher), but eventually the >> >> time it takes to checkpoint begins to grow until it's in the minutes >> >> on average, and finally it won't even complete within a 10 minute >> >> period. I'm using 2 parallelism, and it seems to keep up with the >> >> number of incoming messages just fine (until the checkpoint duration >> >> grows too large and it is unable to delete the messages any longer). >> >> To try to isolate the problem, I wrote an alternate sqs reader that >> >> uses the Watch transform to periodically read from SQS. This variant >> >> doesn't show the same behavior, and has been running for a week >> >> without issue (an average checkpoint time of 1-2s). >> >> >> >> Some other experiments I tried: >> >> >> >> * I observed that the operators that took a long time to checkpoint >> >> were the deduplicating operators after the actual unbounded source >> >> operator. I disabled requiresDeduping and added a Reshuffle instead, >> >> however that exhibited the same growth in checkpoint durations after a >> >> period of time. >> >> * I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly >> >> once, however that also exhibited the same behavior. >> >> >> >> >> >> Does anyone have any thoughts about what might cause this behavior >> >> with the Unbounded Source (as opposed to the splittable do variant)? >> >> >> >> I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0. >> >> >> >> Stephen
