Re: Escalating Checkpoint durations in Flink with SQS Source

2020-01-17 Thread Stephen Patel
The rocksdb state backend for Flink offers incremental checkpointing
(which I have configured).  However I don't think the state itself is
a problem.  The example I posted earlier has no usage of state except
for the cogroupbykey, which should be cleared on each firing, and only
a constant number of keys (1).  So we shouldn't have any growing state
at all in the example.  If the state itself (or my usage of it) was
the problem, then I wouldn't think that my Watch based sqs source
would change the equation, yet the Watch based source has been stable
for over a week now.

Stephen

On Fri, Jan 17, 2020 at 3:16 PM Kenneth Knowles  wrote:
>
>
>
> On Fri, Jan 17, 2020 at 11:44 AM Stephen Patel  
> wrote:
>>
>> * What is the space of keys? Is it constantly growing?
>>
>> The keys are effectively database row ids so they're always growing,
>> or being updated if the rows are modified.  Each row is small.
>>
>> * Are you explicitly clearing out state from stale keys?
>>
>> No, once a key is entered, I keep the value for it until it's updated
>> with a new value.  This is so that if the value changes, or we get an
>> out of order version of that value, we can issue the appropriate
>> changes/retractions.
>
>
> In this case your state will continue to grow without bound. I don't know 
> about Flink's details. If checkpoints are incremental then maybe this is no 
> problem? But maybe it is.
>
> Kenn
>
>>
>> I can share a small variant of the code that doesn't involve any
>> statefulness (apart from the implicit and temporary statefulness of
>> the cogroup/deduplication operator):
>> https://gist.github.com/spatel11/ced0d175ca64962e0ec734a857d1ef33
>>
>> This code can create two types of pipelines.  One fills two sqs queues
>> at different rates, the other reads from those queues, cogroups them,
>> and generates some metrics.  The read pipeline will operate fine for.
>>
>> I've also included the flink-conf that I was using.
>>
>> Stephen
>>
>> On Fri, Jan 17, 2020 at 11:47 AM Kenneth Knowles  wrote:
>> >
>> > Starting with just checking some basic things: What is the space of keys? 
>> > Is it constantly growing? Are you explicitly clearing out state from stale 
>> > keys? In the global window, you don't get any state GC for free.
>> >
>> > Can you share repro code?
>> >
>> > Kenn
>> >
>> > On Fri, Jan 17, 2020 at 8:53 AM Stephen Patel  
>> > wrote:
>> >>
>> >> I've got a beam pipeline using the FlinkRunner that reads from two
>> >> different SQS sources (using the SqsIO).  It does some stateful
>> >> processing on each stream, and then cogroups the results together to
>> >> generate a result and write it to Sns (using the SnsIO).  The volume
>> >> of input data isn't particularly large (about 50 messages per minute
>> >> on one queue, and about 1 message per minute on the other queue).
>> >> It's using the Global window, discarding fired panes, with a
>> >> processing time trigger delayed by 1 minute.  Checkpointing is enabled
>> >> at a 1 minute interval, with a minimum delay between checkpoints of 30
>> >> seconds.  My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED
>> >> predefined options.
>> >>
>> >> This pipeline runs fine for a few hours with an average checkpoint
>> >> duration of 1s (with occasional spikes higher), but eventually the
>> >> time it takes to checkpoint begins to grow until it's in the minutes
>> >> on average, and finally it won't even complete within a 10 minute
>> >> period.  I'm using 2 parallelism, and it seems to keep up with the
>> >> number of incoming messages just fine (until the checkpoint duration
>> >> grows too large and it is unable to delete the messages any longer).
>> >> To try to isolate the problem, I wrote an alternate sqs reader that
>> >> uses the Watch transform to periodically read from SQS.  This variant
>> >> doesn't show the same behavior, and has been running for a week
>> >> without issue (an average checkpoint time of 1-2s).
>> >>
>> >> Some other experiments I tried:
>> >>
>> >> * I observed that the operators that took a long time to checkpoint
>> >> were the deduplicating operators after the actual unbounded source
>> >> operator.  I disabled requiresDeduping and added a Reshuffle instead,
>> >> however that exhibited the same growth in checkpoint durations after a
>> >> period of time.
>> >> * I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly
>> >> once, however that also exhibited the same behavior.
>> >>
>> >>
>> >> Does anyone have any thoughts about what might cause this behavior
>> >> with the Unbounded Source (as opposed to the splittable do variant)?
>> >>
>> >> I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0.
>> >>
>> >> Stephen


Re: Escalating Checkpoint durations in Flink with SQS Source

2020-01-17 Thread Kenneth Knowles
On Fri, Jan 17, 2020 at 11:44 AM Stephen Patel 
wrote:

> * What is the space of keys? Is it constantly growing?
>
> The keys are effectively database row ids so they're always growing,
> or being updated if the rows are modified.  Each row is small.
>
> * Are you explicitly clearing out state from stale keys?
>
> No, once a key is entered, I keep the value for it until it's updated
> with a new value.  This is so that if the value changes, or we get an
> out of order version of that value, we can issue the appropriate
> changes/retractions.
>

In this case your state will continue to grow without bound. I don't know
about Flink's details. If checkpoints are incremental then maybe this is no
problem? But maybe it is.

Kenn


> I can share a small variant of the code that doesn't involve any
> statefulness (apart from the implicit and temporary statefulness of
> the cogroup/deduplication operator):
> https://gist.github.com/spatel11/ced0d175ca64962e0ec734a857d1ef33
>
> This code can create two types of pipelines.  One fills two sqs queues
> at different rates, the other reads from those queues, cogroups them,
> and generates some metrics.  The read pipeline will operate fine for.
>
> I've also included the flink-conf that I was using.
>
> Stephen
>
> On Fri, Jan 17, 2020 at 11:47 AM Kenneth Knowles  wrote:
> >
> > Starting with just checking some basic things: What is the space of
> keys? Is it constantly growing? Are you explicitly clearing out state from
> stale keys? In the global window, you don't get any state GC for free.
> >
> > Can you share repro code?
> >
> > Kenn
> >
> > On Fri, Jan 17, 2020 at 8:53 AM Stephen Patel 
> wrote:
> >>
> >> I've got a beam pipeline using the FlinkRunner that reads from two
> >> different SQS sources (using the SqsIO).  It does some stateful
> >> processing on each stream, and then cogroups the results together to
> >> generate a result and write it to Sns (using the SnsIO).  The volume
> >> of input data isn't particularly large (about 50 messages per minute
> >> on one queue, and about 1 message per minute on the other queue).
> >> It's using the Global window, discarding fired panes, with a
> >> processing time trigger delayed by 1 minute.  Checkpointing is enabled
> >> at a 1 minute interval, with a minimum delay between checkpoints of 30
> >> seconds.  My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED
> >> predefined options.
> >>
> >> This pipeline runs fine for a few hours with an average checkpoint
> >> duration of 1s (with occasional spikes higher), but eventually the
> >> time it takes to checkpoint begins to grow until it's in the minutes
> >> on average, and finally it won't even complete within a 10 minute
> >> period.  I'm using 2 parallelism, and it seems to keep up with the
> >> number of incoming messages just fine (until the checkpoint duration
> >> grows too large and it is unable to delete the messages any longer).
> >> To try to isolate the problem, I wrote an alternate sqs reader that
> >> uses the Watch transform to periodically read from SQS.  This variant
> >> doesn't show the same behavior, and has been running for a week
> >> without issue (an average checkpoint time of 1-2s).
> >>
> >> Some other experiments I tried:
> >>
> >> * I observed that the operators that took a long time to checkpoint
> >> were the deduplicating operators after the actual unbounded source
> >> operator.  I disabled requiresDeduping and added a Reshuffle instead,
> >> however that exhibited the same growth in checkpoint durations after a
> >> period of time.
> >> * I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly
> >> once, however that also exhibited the same behavior.
> >>
> >>
> >> Does anyone have any thoughts about what might cause this behavior
> >> with the Unbounded Source (as opposed to the splittable do variant)?
> >>
> >> I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0.
> >>
> >> Stephen
>


Re: Escalating Checkpoint durations in Flink with SQS Source

2020-01-17 Thread Stephen Patel
* What is the space of keys? Is it constantly growing?

The keys are effectively database row ids so they're always growing,
or being updated if the rows are modified.  Each row is small.

* Are you explicitly clearing out state from stale keys?

No, once a key is entered, I keep the value for it until it's updated
with a new value.  This is so that if the value changes, or we get an
out of order version of that value, we can issue the appropriate
changes/retractions.

I can share a small variant of the code that doesn't involve any
statefulness (apart from the implicit and temporary statefulness of
the cogroup/deduplication operator):
https://gist.github.com/spatel11/ced0d175ca64962e0ec734a857d1ef33

This code can create two types of pipelines.  One fills two sqs queues
at different rates, the other reads from those queues, cogroups them,
and generates some metrics.  The read pipeline will operate fine for.

I've also included the flink-conf that I was using.

Stephen

On Fri, Jan 17, 2020 at 11:47 AM Kenneth Knowles  wrote:
>
> Starting with just checking some basic things: What is the space of keys? Is 
> it constantly growing? Are you explicitly clearing out state from stale keys? 
> In the global window, you don't get any state GC for free.
>
> Can you share repro code?
>
> Kenn
>
> On Fri, Jan 17, 2020 at 8:53 AM Stephen Patel  
> wrote:
>>
>> I've got a beam pipeline using the FlinkRunner that reads from two
>> different SQS sources (using the SqsIO).  It does some stateful
>> processing on each stream, and then cogroups the results together to
>> generate a result and write it to Sns (using the SnsIO).  The volume
>> of input data isn't particularly large (about 50 messages per minute
>> on one queue, and about 1 message per minute on the other queue).
>> It's using the Global window, discarding fired panes, with a
>> processing time trigger delayed by 1 minute.  Checkpointing is enabled
>> at a 1 minute interval, with a minimum delay between checkpoints of 30
>> seconds.  My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED
>> predefined options.
>>
>> This pipeline runs fine for a few hours with an average checkpoint
>> duration of 1s (with occasional spikes higher), but eventually the
>> time it takes to checkpoint begins to grow until it's in the minutes
>> on average, and finally it won't even complete within a 10 minute
>> period.  I'm using 2 parallelism, and it seems to keep up with the
>> number of incoming messages just fine (until the checkpoint duration
>> grows too large and it is unable to delete the messages any longer).
>> To try to isolate the problem, I wrote an alternate sqs reader that
>> uses the Watch transform to periodically read from SQS.  This variant
>> doesn't show the same behavior, and has been running for a week
>> without issue (an average checkpoint time of 1-2s).
>>
>> Some other experiments I tried:
>>
>> * I observed that the operators that took a long time to checkpoint
>> were the deduplicating operators after the actual unbounded source
>> operator.  I disabled requiresDeduping and added a Reshuffle instead,
>> however that exhibited the same growth in checkpoint durations after a
>> period of time.
>> * I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly
>> once, however that also exhibited the same behavior.
>>
>>
>> Does anyone have any thoughts about what might cause this behavior
>> with the Unbounded Source (as opposed to the splittable do variant)?
>>
>> I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0.
>>
>> Stephen


Re: Escalating Checkpoint durations in Flink with SQS Source

2020-01-17 Thread Kenneth Knowles
Starting with just checking some basic things: What is the space of keys?
Is it constantly growing? Are you explicitly clearing out state from stale
keys? In the global window, you don't get any state GC for free.

Can you share repro code?

Kenn

On Fri, Jan 17, 2020 at 8:53 AM Stephen Patel 
wrote:

> I've got a beam pipeline using the FlinkRunner that reads from two
> different SQS sources (using the SqsIO).  It does some stateful
> processing on each stream, and then cogroups the results together to
> generate a result and write it to Sns (using the SnsIO).  The volume
> of input data isn't particularly large (about 50 messages per minute
> on one queue, and about 1 message per minute on the other queue).
> It's using the Global window, discarding fired panes, with a
> processing time trigger delayed by 1 minute.  Checkpointing is enabled
> at a 1 minute interval, with a minimum delay between checkpoints of 30
> seconds.  My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED
> predefined options.
>
> This pipeline runs fine for a few hours with an average checkpoint
> duration of 1s (with occasional spikes higher), but eventually the
> time it takes to checkpoint begins to grow until it's in the minutes
> on average, and finally it won't even complete within a 10 minute
> period.  I'm using 2 parallelism, and it seems to keep up with the
> number of incoming messages just fine (until the checkpoint duration
> grows too large and it is unable to delete the messages any longer).
> To try to isolate the problem, I wrote an alternate sqs reader that
> uses the Watch transform to periodically read from SQS.  This variant
> doesn't show the same behavior, and has been running for a week
> without issue (an average checkpoint time of 1-2s).
>
> Some other experiments I tried:
>
> * I observed that the operators that took a long time to checkpoint
> were the deduplicating operators after the actual unbounded source
> operator.  I disabled requiresDeduping and added a Reshuffle instead,
> however that exhibited the same growth in checkpoint durations after a
> period of time.
> * I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly
> once, however that also exhibited the same behavior.
>
>
> Does anyone have any thoughts about what might cause this behavior
> with the Unbounded Source (as opposed to the splittable do variant)?
>
> I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0.
>
> Stephen
>


Escalating Checkpoint durations in Flink with SQS Source

2020-01-17 Thread Stephen Patel
I've got a beam pipeline using the FlinkRunner that reads from two
different SQS sources (using the SqsIO).  It does some stateful
processing on each stream, and then cogroups the results together to
generate a result and write it to Sns (using the SnsIO).  The volume
of input data isn't particularly large (about 50 messages per minute
on one queue, and about 1 message per minute on the other queue).
It's using the Global window, discarding fired panes, with a
processing time trigger delayed by 1 minute.  Checkpointing is enabled
at a 1 minute interval, with a minimum delay between checkpoints of 30
seconds.  My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED
predefined options.

This pipeline runs fine for a few hours with an average checkpoint
duration of 1s (with occasional spikes higher), but eventually the
time it takes to checkpoint begins to grow until it's in the minutes
on average, and finally it won't even complete within a 10 minute
period.  I'm using 2 parallelism, and it seems to keep up with the
number of incoming messages just fine (until the checkpoint duration
grows too large and it is unable to delete the messages any longer).
To try to isolate the problem, I wrote an alternate sqs reader that
uses the Watch transform to periodically read from SQS.  This variant
doesn't show the same behavior, and has been running for a week
without issue (an average checkpoint time of 1-2s).

Some other experiments I tried:

* I observed that the operators that took a long time to checkpoint
were the deduplicating operators after the actual unbounded source
operator.  I disabled requiresDeduping and added a Reshuffle instead,
however that exhibited the same growth in checkpoint durations after a
period of time.
* I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly
once, however that also exhibited the same behavior.


Does anyone have any thoughts about what might cause this behavior
with the Unbounded Source (as opposed to the splittable do variant)?

I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0.

Stephen