Hi Aljoscha, Thanks for the reply!
I found that my stateful operator (with parallelism 10) wasn't equally split between the task managers on the two nodes (it was split 9/1) - so I tweaked the task manager / slot configuration until Flink allocated them equally with 5 instances of the operator on each node. (Just wondering if there's a better way to get Flink to allocate this specific operator equally between nodes, regardless of the number of slots available on each?) Having split the stateful operator equally between 2 nodes, I am actually able to checkpoint 18.5MB of state in ~4 minutes. Which indicates an overall throughput of ~77MB/sec (38.5MB/sec per node). I did what you said and tried uploading a large file from one of those VMs to S3 using the AWS command line tool. It uploaded at a speed of ~76MB/sec. Which is nearly double 38MB/sec but at least it's not orders of magnitude out. Does that sound ok? - I guess there's more that goes on when Flink takes a checkpoint than just uploading anyway... I've upgraded my cluster to Flink 1.2-SNAPSHOT yesterday so yeah should be using the fully async mode. I'll have a proper look in the logs if I see it crash again, and for now will just add more nodes whenever we need to speed up the checkpointing. Thanks, Josh On Tue, Oct 25, 2016 at 3:12 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Josh, > Checkpoints that take longer than the checkpoint interval should not be an > issue (if you use an up-to-date version of Flink). The checkpoint > coordinator will not issue another checkpoint while another one is still > ongoing. Is there maybe some additional data for the crashes? A log perhaps? > > Regarding upload speed, yes, each instance of an operator is responsible > for uploading its state so if state is equally distributed between > operators on TaskManagers that would mean that each TaskManager would > upload roughly the same amount of state. It might be interesting to see > what the raw upload speed is when you have those to VMs upload to S3, if it > is a lot larger than the speed you're seeing something would be wrong and > we should investigate. One last thing: are you using the "fully async" mode > of RocksDB? I think I remember that you do, just checking. > > If it is indeed a problem of upload speed to S3 per machine then yes, > using more instances should speed up checkpointing. > > About incremental checkpoints: they're not going to make it into 1.2 with > the current planning but after that, I don't know yet. > > Cheers, > Aljoscha > > > On Mon, 24 Oct 2016 at 19:06 Josh <jof...@gmail.com> wrote: > > Hi all, > > I'm running Flink on EMR/YARN with 2x m3.xlarge instances and am > checkpointing a fairly large RocksDB state to S3. > > I've found that when the state size hits 10GB, the checkpoint takes around > 6 minutes, according to the Flink dashboard. Originally my checkpoint > interval was 5 minutes for the job, but I've found that the YARN container > crashes (I guess because the checkpoint time is greater than the checkpoint > interval), so have now decreased the checkpoint frequency to every 10 > minutes. > > I was just wondering if anyone has any tips about how to reduce the > checkpoint time. Taking 6 minutes to checkpoint ~10GB state means it's > uploading at ~30MB/sec. I believe the m3.xlarge instances should have > around 125MB/sec network bandwidth each, so I think the bottleneck is S3. > Since there are 2 instances, I'm not sure if that means each instance is > uploading at 15MB/sec - do the state uploads get shared equally among the > instances, assuming the state is split equally between the task managers? > > If the state upload is split between the instances, perhaps the only way > to speed up the checkpoints is to add more instances and task managers, and > split the state equally among the task managers? > > Also just wondering - is there any chance the incremental checkpoints work > will be complete any time soon? > > Thanks, > Josh > >