Re: External checkpoints not getting cleaned up/discarded - potentially causing high load

2017-09-04 Thread Stefan Richter
Hi Jared,

I just wanted to follow up on this problem that you reported. Are there any new 
insights about this problem from your debugging efforts and does it still 
exists for you?

Best,
Stefan

> Am 09.07.2017 um 18:37 schrieb Jared Stehler 
> :
> 
> We are using the rocksDB state backend. We had not activated incremental 
> checkpointing, but in the course of debugging this, we ended up doing so, and 
> also moving back to S3 from EFS as it appeared that EFS was introducing large 
> latencies. I will attempt to provide some profiler data as we are able to 
> analyze further.
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
> 
> 
> 
>> On Jul 3, 2017, at 6:02 AM, Stefan Richter > > wrote:
>> 
>> Hi,
>> 
>> I have two quick questions about this problem report:
>> 
>> 1) Which state backend are you using?
>> 2) In case you are using RocksDB, did you also activate incremental 
>> checkpointing when moving to Flink 1.3.
>> 
>> Another thing that could be really helpful, if possible, can you attach a 
>> profiler/sampling to your job manager and figure out the hotspot methods 
>> where most time is spend? This would be very helpful as a starting point 
>> where the problem is potentially caused.
>> 
>> Best,
>> Stefan
>> 
>>> Am 29.06.2017 um 18:02 schrieb Jared Stehler 
>>> >> >:
>>> 
>>> We’re seeing our external checkpoints directory grow in an unbounded 
>>> fashion… after upgrading to Flink 1.3.  We are using Flink-Mesos.
>>> 
>>> In 1.2 (HA standalone mode), we saw (correctly) that only the latest 
>>> external checkpoint was being retained (i.e., respecting 
>>> state.checkpoints.num-retained default of 1)
>>> 
>>> The Mesos-agent running the Job Manager ends up with a really high load and 
>>> ends up getting unresponsive….  Interestingly enough, there is not much CPU 
>>> or Memory pressure… so it is suggesting to us that its IO or Network bound. 
>>>  But nothing jumps out at us (using iostat/netstat).  The biggest 
>>> difference seems to be external checkpoints not getting cleaned 
>>> up/discarded.  What might cause that?
>>> 
>>> ubuntu@ip-10-80-52-176:/mnt/shared/flink/ext-checkpoints$ top
>>> top - 13:47:41 up 16:31,  1 user,  load average: 25.85, 25.62, 25.43
>>> Tasks: 297 total,   1 running, 296 sleeping,   0 stopped,   0 zombie
>>> %Cpu(s):  0.3 us,  0.0 sy,  0.0 ni, 98.8 id,  0.7 wa,  0.0 hi,  0.0 si,  
>>> 0.0 st
>>> KiB Mem:  32948204 total, 23974844 used,  8973360 free,   144572 buffers
>>> KiB Swap:0 total,0 used,0 free.  7752480 cached Mem
>>> 
>>> We specify Mesos agent attributes to ensure that our Flink containers are 
>>> allocated to only a subset of the Mesos slaves…   However, we do end up 
>>> with the Flink JobManager container running on the same physical instance 
>>> as multiple task manager containers. We are running 65 task managers with 2 
>>> slots each, and ~70 jobs currently on the cluster.
>>> 
>>> We use AWS EFS (https://aws.amazon.com/efs/ ) 
>>> mounted on all Mesos boxes to store recovery, checkpoint, external 
>>> checkpoint and save point directories.
>>> 
>>> 
>>> 
>>> executionEnvironment.enableCheckpointing(TimeUnit.SECONDS.toMillis(30));
>>> 
>>> CheckpointConfig config = 
>>> executionEnvironment.getCheckpointConfig();
>>> 
>>> config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>> config.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(5));
>>> 
>>> executionEnvironment.getConfig().setGlobalJobParameters(params);
>>> 
>>> executionEnvironment.getConfig().setAutoWatermarkInterval(watermarkInterval.getValue());
>>> 
>>> executionEnvironment.getConfig().setCodeAnalysisMode(CodeAnalysisMode.HINT);
>>> 
>>> 
>>> executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>> // fail the job if it restarts more than 3 times in 1 minute, with 
>>> 10 second delay
>>> 
>>> executionEnvironment.setRestartStrategy(RestartStrategies.failureRateRestart(3,
>>> Time.minutes(2), Time.seconds(1)));
>>> 
>>> executionEnvironment.getConfig().setLatencyTrackingInterval(3);
>>> 
>>> 
>>> Would appreciate any insights you might have on this. 
>>> 
>>> Thanks
>>> 
>>> --
>>> Jared Stehler
>>> Chief Architect - Intellify Learning
>>> o: 617.701.6330 x703
>>> 
>>> 
>>> 
>> 
> 



Re: External checkpoints not getting cleaned up/discarded - potentially causing high load

2017-07-09 Thread Jared Stehler
We are using the rocksDB state backend. We had not activated incremental 
checkpointing, but in the course of debugging this, we ended up doing so, and 
also moving back to S3 from EFS as it appeared that EFS was introducing large 
latencies. I will attempt to provide some profiler data as we are able to 
analyze further.

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



> On Jul 3, 2017, at 6:02 AM, Stefan Richter  
> wrote:
> 
> Hi,
> 
> I have two quick questions about this problem report:
> 
> 1) Which state backend are you using?
> 2) In case you are using RocksDB, did you also activate incremental 
> checkpointing when moving to Flink 1.3.
> 
> Another thing that could be really helpful, if possible, can you attach a 
> profiler/sampling to your job manager and figure out the hotspot methods 
> where most time is spend? This would be very helpful as a starting point 
> where the problem is potentially caused.
> 
> Best,
> Stefan
> 
>> Am 29.06.2017 um 18:02 schrieb Jared Stehler 
>> > >:
>> 
>> We’re seeing our external checkpoints directory grow in an unbounded 
>> fashion… after upgrading to Flink 1.3.  We are using Flink-Mesos.
>> 
>> In 1.2 (HA standalone mode), we saw (correctly) that only the latest 
>> external checkpoint was being retained (i.e., respecting 
>> state.checkpoints.num-retained default of 1)
>> 
>> The Mesos-agent running the Job Manager ends up with a really high load and 
>> ends up getting unresponsive….  Interestingly enough, there is not much CPU 
>> or Memory pressure… so it is suggesting to us that its IO or Network bound.  
>> But nothing jumps out at us (using iostat/netstat).  The biggest difference 
>> seems to be external checkpoints not getting cleaned up/discarded.  What 
>> might cause that?
>> 
>> ubuntu@ip-10-80-52-176:/mnt/shared/flink/ext-checkpoints$ top
>> top - 13:47:41 up 16:31,  1 user,  load average: 25.85, 25.62, 25.43
>> Tasks: 297 total,   1 running, 296 sleeping,   0 stopped,   0 zombie
>> %Cpu(s):  0.3 us,  0.0 sy,  0.0 ni, 98.8 id,  0.7 wa,  0.0 hi,  0.0 si,  0.0 
>> st
>> KiB Mem:  32948204 total, 23974844 used,  8973360 free,   144572 buffers
>> KiB Swap:0 total,0 used,0 free.  7752480 cached Mem
>> 
>> We specify Mesos agent attributes to ensure that our Flink containers are 
>> allocated to only a subset of the Mesos slaves…   However, we do end up with 
>> the Flink JobManager container running on the same physical instance as 
>> multiple task manager containers. We are running 65 task managers with 2 
>> slots each, and ~70 jobs currently on the cluster.
>> 
>> We use AWS EFS (https://aws.amazon.com/efs/ ) 
>> mounted on all Mesos boxes to store recovery, checkpoint, external 
>> checkpoint and save point directories.
>> 
>> 
>> 
>> executionEnvironment.enableCheckpointing(TimeUnit.SECONDS.toMillis(30));
>> 
>> CheckpointConfig config = executionEnvironment.getCheckpointConfig();
>> 
>> config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> config.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(5));
>> 
>> executionEnvironment.getConfig().setGlobalJobParameters(params);
>> 
>> executionEnvironment.getConfig().setAutoWatermarkInterval(watermarkInterval.getValue());
>> 
>> executionEnvironment.getConfig().setCodeAnalysisMode(CodeAnalysisMode.HINT);
>> 
>> 
>> executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>> // fail the job if it restarts more than 3 times in 1 minute, with 
>> 10 second delay
>> 
>> executionEnvironment.setRestartStrategy(RestartStrategies.failureRateRestart(3,
>> Time.minutes(2), Time.seconds(1)));
>> 
>> executionEnvironment.getConfig().setLatencyTrackingInterval(3);
>> 
>> 
>> Would appreciate any insights you might have on this.
>> 
>> Thanks
>> 
>> --
>> Jared Stehler
>> Chief Architect - Intellify Learning
>> o: 617.701.6330 x703
>> 
>> 
>> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: External checkpoints not getting cleaned up/discarded - potentially causing high load

2017-07-03 Thread Ufuk Celebi
On Mon, Jul 3, 2017 at 12:02 PM, Stefan Richter
 wrote:
> Another thing that could be really helpful, if possible, can you attach a
> profiler/sampling to your job manager and figure out the hotspot methods
> where most time is spend? This would be very helpful as a starting point
> where the problem is potentially caused.

A stack trace will also be helpful to see whether some threads are stuck.

If it is possible to run on Mesos without HA mode (@Till: is that
possible?), it might be worthwhile to re-run this without HA to get a
hint whether it is related to HA mode.

– Ufuk


Re: External checkpoints not getting cleaned up/discarded - potentially causing high load

2017-07-03 Thread Stefan Richter
Hi,

I have two quick questions about this problem report:

1) Which state backend are you using?
2) In case you are using RocksDB, did you also activate incremental 
checkpointing when moving to Flink 1.3.

Another thing that could be really helpful, if possible, can you attach a 
profiler/sampling to your job manager and figure out the hotspot methods where 
most time is spend? This would be very helpful as a starting point where the 
problem is potentially caused.

Best,
Stefan

> Am 29.06.2017 um 18:02 schrieb Jared Stehler 
> :
> 
> We’re seeing our external checkpoints directory grow in an unbounded fashion… 
> after upgrading to Flink 1.3.  We are using Flink-Mesos.
> 
> In 1.2 (HA standalone mode), we saw (correctly) that only the latest external 
> checkpoint was being retained (i.e., respecting 
> state.checkpoints.num-retained default of 1)
> 
> The Mesos-agent running the Job Manager ends up with a really high load and 
> ends up getting unresponsive….  Interestingly enough, there is not much CPU 
> or Memory pressure… so it is suggesting to us that its IO or Network bound.  
> But nothing jumps out at us (using iostat/netstat).  The biggest difference 
> seems to be external checkpoints not getting cleaned up/discarded.  What 
> might cause that?
> 
> ubuntu@ip-10-80-52-176:/mnt/shared/flink/ext-checkpoints$ top
> top - 13:47:41 up 16:31,  1 user,  load average: 25.85, 25.62, 25.43
> Tasks: 297 total,   1 running, 296 sleeping,   0 stopped,   0 zombie
> %Cpu(s):  0.3 us,  0.0 sy,  0.0 ni, 98.8 id,  0.7 wa,  0.0 hi,  0.0 si,  0.0 
> st
> KiB Mem:  32948204 total, 23974844 used,  8973360 free,   144572 buffers
> KiB Swap:0 total,0 used,0 free.  7752480 cached Mem
> 
> We specify Mesos agent attributes to ensure that our Flink containers are 
> allocated to only a subset of the Mesos slaves…   However, we do end up with 
> the Flink JobManager container running on the same physical instance as 
> multiple task manager containers. We are running 65 task managers with 2 
> slots each, and ~70 jobs currently on the cluster.
> 
> We use AWS EFS (https://aws.amazon.com/efs/ ) 
> mounted on all Mesos boxes to store recovery, checkpoint, external checkpoint 
> and save point directories.
> 
> 
> 
> executionEnvironment.enableCheckpointing(TimeUnit.SECONDS.toMillis(30));
> 
> CheckpointConfig config = executionEnvironment.getCheckpointConfig();
> 
> config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> config.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(5));
> 
> executionEnvironment.getConfig().setGlobalJobParameters(params);
> 
> executionEnvironment.getConfig().setAutoWatermarkInterval(watermarkInterval.getValue());
> 
> executionEnvironment.getConfig().setCodeAnalysisMode(CodeAnalysisMode.HINT);
> 
> 
> executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
> // fail the job if it restarts more than 3 times in 1 minute, with 10 
> second delay
> 
> executionEnvironment.setRestartStrategy(RestartStrategies.failureRateRestart(3,
> Time.minutes(2), Time.seconds(1)));
> 
> executionEnvironment.getConfig().setLatencyTrackingInterval(3);
> 
> 
> Would appreciate any insights you might have on this. 
> 
> Thanks
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
> 
> 
> 



External checkpoints not getting cleaned up/discarded - potentially causing high load

2017-06-29 Thread Jared Stehler
We’re seeing our external checkpoints directory grow in an unbounded fashion… 
after upgrading to Flink 1.3.  We are using Flink-Mesos.

In 1.2 (HA standalone mode), we saw (correctly) that only the latest external 
checkpoint was being retained (i.e., respecting state.checkpoints.num-retained 
default of 1)

The Mesos-agent running the Job Manager ends up with a really high load and 
ends up getting unresponsive….  Interestingly enough, there is not much CPU or 
Memory pressure… so it is suggesting to us that its IO or Network bound.  But 
nothing jumps out at us (using iostat/netstat).  The biggest difference seems 
to be external checkpoints not getting cleaned up/discarded.  What might cause 
that?

ubuntu@ip-10-80-52-176:/mnt/shared/flink/ext-checkpoints$ top
top - 13:47:41 up 16:31,  1 user,  load average: 25.85, 25.62, 25.43
Tasks: 297 total,   1 running, 296 sleeping,   0 stopped,   0 zombie
%Cpu(s):  0.3 us,  0.0 sy,  0.0 ni, 98.8 id,  0.7 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem:  32948204 total, 23974844 used,  8973360 free,   144572 buffers
KiB Swap:0 total,0 used,0 free.  7752480 cached Mem

We specify Mesos agent attributes to ensure that our Flink containers are 
allocated to only a subset of the Mesos slaves…   However, we do end up with 
the Flink JobManager container running on the same physical instance as 
multiple task manager containers. We are running 65 task managers with 2 slots 
each, and ~70 jobs currently on the cluster.

We use AWS EFS (https://aws.amazon.com/efs/ ) 
mounted on all Mesos boxes to store recovery, checkpoint, external checkpoint 
and save point directories.


executionEnvironment.enableCheckpointing(TimeUnit.SECONDS.toMillis(30));

CheckpointConfig config = executionEnvironment.getCheckpointConfig();

config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(5));

executionEnvironment.getConfig().setGlobalJobParameters(params);

executionEnvironment.getConfig().setAutoWatermarkInterval(watermarkInterval.getValue());

executionEnvironment.getConfig().setCodeAnalysisMode(CodeAnalysisMode.HINT);


executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// fail the job if it restarts more than 3 times in 1 minute, with 10 
second delay

executionEnvironment.setRestartStrategy(RestartStrategies.failureRateRestart(3,
Time.minutes(2), Time.seconds(1)));

executionEnvironment.getConfig().setLatencyTrackingInterval(3);


Would appreciate any insights you might have on this.

Thanks

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703





signature.asc
Description: Message signed with OpenPGP using GPGMail