The rocksdb state backend for Flink offers incremental checkpointing
(which I have configured).  However I don't think the state itself is
a problem.  The example I posted earlier has no usage of state except
for the cogroupbykey, which should be cleared on each firing, and only
a constant number of keys (1).  So we shouldn't have any growing state
at all in the example.  If the state itself (or my usage of it) was
the problem, then I wouldn't think that my Watch based sqs source
would change the equation, yet the Watch based source has been stable
for over a week now.

Stephen

On Fri, Jan 17, 2020 at 3:16 PM Kenneth Knowles <[email protected]> wrote:
>
>
>
> On Fri, Jan 17, 2020 at 11:44 AM Stephen Patel <[email protected]> 
> wrote:
>>
>> * What is the space of keys? Is it constantly growing?
>>
>> The keys are effectively database row ids so they're always growing,
>> or being updated if the rows are modified.  Each row is small.
>>
>> * Are you explicitly clearing out state from stale keys?
>>
>> No, once a key is entered, I keep the value for it until it's updated
>> with a new value.  This is so that if the value changes, or we get an
>> out of order version of that value, we can issue the appropriate
>> changes/retractions.
>
>
> In this case your state will continue to grow without bound. I don't know 
> about Flink's details. If checkpoints are incremental then maybe this is no 
> problem? But maybe it is.
>
> Kenn
>
>>
>> I can share a small variant of the code that doesn't involve any
>> statefulness (apart from the implicit and temporary statefulness of
>> the cogroup/deduplication operator):
>> https://gist.github.com/spatel11/ced0d175ca64962e0ec734a857d1ef33
>>
>> This code can create two types of pipelines.  One fills two sqs queues
>> at different rates, the other reads from those queues, cogroups them,
>> and generates some metrics.  The read pipeline will operate fine for.
>>
>> I've also included the flink-conf that I was using.
>>
>> Stephen
>>
>> On Fri, Jan 17, 2020 at 11:47 AM Kenneth Knowles <[email protected]> wrote:
>> >
>> > Starting with just checking some basic things: What is the space of keys? 
>> > Is it constantly growing? Are you explicitly clearing out state from stale 
>> > keys? In the global window, you don't get any state GC for free.
>> >
>> > Can you share repro code?
>> >
>> > Kenn
>> >
>> > On Fri, Jan 17, 2020 at 8:53 AM Stephen Patel <[email protected]> 
>> > wrote:
>> >>
>> >> I've got a beam pipeline using the FlinkRunner that reads from two
>> >> different SQS sources (using the SqsIO).  It does some stateful
>> >> processing on each stream, and then cogroups the results together to
>> >> generate a result and write it to Sns (using the SnsIO).  The volume
>> >> of input data isn't particularly large (about 50 messages per minute
>> >> on one queue, and about 1 message per minute on the other queue).
>> >> It's using the Global window, discarding fired panes, with a
>> >> processing time trigger delayed by 1 minute.  Checkpointing is enabled
>> >> at a 1 minute interval, with a minimum delay between checkpoints of 30
>> >> seconds.  My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED
>> >> predefined options.
>> >>
>> >> This pipeline runs fine for a few hours with an average checkpoint
>> >> duration of 1s (with occasional spikes higher), but eventually the
>> >> time it takes to checkpoint begins to grow until it's in the minutes
>> >> on average, and finally it won't even complete within a 10 minute
>> >> period.  I'm using 2 parallelism, and it seems to keep up with the
>> >> number of incoming messages just fine (until the checkpoint duration
>> >> grows too large and it is unable to delete the messages any longer).
>> >> To try to isolate the problem, I wrote an alternate sqs reader that
>> >> uses the Watch transform to periodically read from SQS.  This variant
>> >> doesn't show the same behavior, and has been running for a week
>> >> without issue (an average checkpoint time of 1-2s).
>> >>
>> >> Some other experiments I tried:
>> >>
>> >> * I observed that the operators that took a long time to checkpoint
>> >> were the deduplicating operators after the actual unbounded source
>> >> operator.  I disabled requiresDeduping and added a Reshuffle instead,
>> >> however that exhibited the same growth in checkpoint durations after a
>> >> period of time.
>> >> * I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly
>> >> once, however that also exhibited the same behavior.
>> >>
>> >>
>> >> Does anyone have any thoughts about what might cause this behavior
>> >> with the Unbounded Source (as opposed to the splittable do variant)?
>> >>
>> >> I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0.
>> >>
>> >> Stephen

Reply via email to