Hi Aljoscha,

Thanks for the reply!

I found that my stateful operator (with parallelism 10) wasn't equally
split between the task managers on the two nodes (it was split 9/1) - so I
tweaked the task manager / slot configuration until Flink allocated them
equally with 5 instances of the operator on each node. (Just wondering if
there's a better way to get Flink to allocate this specific operator
equally between nodes, regardless of the number of slots available on
each?) Having split the stateful operator equally between 2 nodes, I am
actually able to checkpoint 18.5MB of state in ~4 minutes. Which indicates
an overall throughput of ~77MB/sec (38.5MB/sec per node).

I did what you said and tried uploading a large file from one of those VMs
to S3 using the AWS command line tool. It uploaded at a speed of ~76MB/sec.
Which is nearly double 38MB/sec but at least it's not orders of magnitude
out. Does that sound ok? - I guess there's more that goes on when Flink
takes a checkpoint than just uploading anyway... I've upgraded my cluster
to Flink 1.2-SNAPSHOT yesterday so yeah should be using the fully async
mode.

I'll have a proper look in the logs if I see it crash again, and for now
will just add more nodes whenever we need to speed up the checkpointing.

Thanks,
Josh




On Tue, Oct 25, 2016 at 3:12 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Josh,
> Checkpoints that take longer than the checkpoint interval should not be an
> issue (if you use an up-to-date version of Flink). The checkpoint
> coordinator will not issue another checkpoint while another one is still
> ongoing. Is there maybe some additional data for the crashes? A log perhaps?
>
> Regarding upload speed, yes, each instance of an operator is responsible
> for uploading its state so if state is equally distributed between
> operators on TaskManagers that would mean that each TaskManager would
> upload roughly the same amount of state. It might be interesting to see
> what the raw upload speed is when you have those to VMs upload to S3, if it
> is a lot larger than the speed you're seeing something would be wrong and
> we should investigate. One last thing: are you using the "fully async" mode
> of RocksDB? I think I remember that you do, just checking.
>
> If it is indeed a problem of upload speed to S3 per machine then yes,
> using more instances should speed up checkpointing.
>
> About incremental checkpoints: they're not going to make it into 1.2 with
> the current planning but after that, I don't know yet.
>
> Cheers,
> Aljoscha
>
>
> On Mon, 24 Oct 2016 at 19:06 Josh <jof...@gmail.com> wrote:
>
> Hi all,
>
> I'm running Flink on EMR/YARN with 2x m3.xlarge instances and am
> checkpointing a fairly large RocksDB state to S3.
>
> I've found that when the state size hits 10GB, the checkpoint takes around
> 6 minutes, according to the Flink dashboard. Originally my checkpoint
> interval was 5 minutes for the job, but I've found that the YARN container
> crashes (I guess because the checkpoint time is greater than the checkpoint
> interval), so have now decreased the checkpoint frequency to every 10
> minutes.
>
> I was just wondering if anyone has any tips about how to reduce the
> checkpoint time. Taking 6 minutes to checkpoint ~10GB state means it's
> uploading at ~30MB/sec. I believe the m3.xlarge instances should have
> around 125MB/sec network bandwidth each, so I think the bottleneck is S3.
> Since there are 2 instances, I'm not sure if that means each instance is
> uploading at 15MB/sec - do the state uploads get shared equally among the
> instances, assuming the state is split equally between the task managers?
>
> If the state upload is split between the instances, perhaps the only way
> to speed up the checkpoints is to add more instances and task managers, and
> split the state equally among the task managers?
>
> Also just wondering - is there any chance the incremental checkpoints work
> will be complete any time soon?
>
> Thanks,
> Josh
>
>

Reply via email to