We’re seeing our external checkpoints directory grow in an unbounded fashion… 
after upgrading to Flink 1.3.  We are using Flink-Mesos.

In 1.2 (HA standalone mode), we saw (correctly) that only the latest external 
checkpoint was being retained (i.e., respecting state.checkpoints.num-retained 
default of 1)

The Mesos-agent running the Job Manager ends up with a really high load and 
ends up getting unresponsive….  Interestingly enough, there is not much CPU or 
Memory pressure… so it is suggesting to us that its IO or Network bound.  But 
nothing jumps out at us (using iostat/netstat).  The biggest difference seems 
to be external checkpoints not getting cleaned up/discarded.  What might cause 
that?

ubuntu@ip-10-80-52-176:/mnt/shared/flink/ext-checkpoints$ top
top - 13:47:41 up 16:31,  1 user,  load average: 25.85, 25.62, 25.43
Tasks: 297 total,   1 running, 296 sleeping,   0 stopped,   0 zombie
%Cpu(s):  0.3 us,  0.0 sy,  0.0 ni, 98.8 id,  0.7 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem:  32948204 total, 23974844 used,  8973360 free,   144572 buffers
KiB Swap:        0 total,        0 used,        0 free.  7752480 cached Mem

We specify Mesos agent attributes to ensure that our Flink containers are 
allocated to only a subset of the Mesos slaves…   However, we do end up with 
the Flink JobManager container running on the same physical instance as 
multiple task manager containers. We are running 65 task managers with 2 slots 
each, and ~70 jobs currently on the cluster.

We use AWS EFS (https://aws.amazon.com/efs/ <https://aws.amazon.com/efs/>) 
mounted on all Mesos boxes to store recovery, checkpoint, external checkpoint 
and save point directories.


        executionEnvironment.enableCheckpointing(TimeUnit.SECONDS.toMillis(30));

        CheckpointConfig config = executionEnvironment.getCheckpointConfig();
        
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        config.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(5));

        executionEnvironment.getConfig().setGlobalJobParameters(params);
        
executionEnvironment.getConfig().setAutoWatermarkInterval(watermarkInterval.getValue());
        
executionEnvironment.getConfig().setCodeAnalysisMode(CodeAnalysisMode.HINT);

        
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // fail the job if it restarts more than 3 times in 1 minute, with 10 
second delay
        
executionEnvironment.setRestartStrategy(RestartStrategies.failureRateRestart(3,
                Time.minutes(2), Time.seconds(1)));

        executionEnvironment.getConfig().setLatencyTrackingInterval(30000);


Would appreciate any insights you might have on this.

Thanks

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to