I recently added some better visibility into the metrics we're gathering from 
Flink. My Flink cluster died again due to the "Not enough free slots available 
to run the job" problem, and this time I can see that the number of registered 
task managers went down from 11 to 7, then waffled and only ever got back up to 
10 (one short of the requested amount) before dropping to 0 just before the 
cluster died. This would seem to explain why there weren't sufficient slots 
(given that we were probably using them all or nearly all)… The metric of 
"running jobs" went down from 5 to 3 during this time period as well. So the 
problem seems to be loss of taskmanagers due to errors (not yet sure what 
exactly as I have to delve into logs).

The other thing I have to figure out is restoring the jobs… I thought that HA 
would start the jobs back up again if Flink died & I re-launched it, but that 
doesn't appear to be the case.


From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
Date: Thursday, January 5, 2017 at 7:52 AM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

Another thought on the container failure:

in 1.1, the user code is loaded dynamically whenever a Task is started. That 
means that on every task restart the code is reloaded. For that to work proper, 
class unloading needs to happen, or the permgen will eventually overflow.

It can happen that class unloading is prevented if the user functions do leave 
references around as "GC roots", which may be threads, or references in 
registries, etc.

In Flink 1.2, YARN will put the user code into the application classpath, so 
code needs not be reloaded on every restart. That should solve that issue.
To "simulate" that behavior in Flink 1.1, put your application code jars into 
the "lib" folder

Best,
Stephan


On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin 
<yuri.ruc...@gmail.com<mailto:yuri.ruc...@gmail.com>> wrote:
Hi,

I've faced a similar issue recently. Hope sharing my findings will help. The 
problem can be split into 2 parts:

Source of container failures
The logs you provided indicate that YARN kills its containers for exceeding 
memory limits. Important point here is that memory limit = JVM heap memory + 
off-heap memory. So if off-heap memory usage is high, YARN may kill containers 
despite JVM heap consumption is fine. To solve this issue, Flink reserves a 
share of container memory for off-heap memory. How much will be reserved is 
controlled by yarn.heap-cutoff-ratio and yarn.heap-cutoff-min configuration. By 
default 25% of the requested container memory will be reserved for off-heap. 
This is seems to be a good start, but one should experiment and tune to meet 
their job specifics.

It's also worthwhile to figure out who consumes off-heap memory. Is it Flink 
managed memory moved off heap (taskmanager.memory.off-heap = true)? Is it some 
external library allocating something off heap? Is it your own code?

How Flink handles task manager failures
Whenever a task manager fails, the Flink jobmanager decides whether it should:
- reallocate failed task manager container
- fail application entirely
These decisions can be guided by certain configuration 
(https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn).
 With default settings, job manager does reallocate task manager containers up 
to the point when N failures have been observed, where N is the number of 
requested task managers. After that the application is stopped.

According to the logs, you have a finite number in 
yarn.maximum-failed-containers (11, as I can see from the logs - this may be 
set by Flink if not provided explicitly). On 12th container restart, jobmanager 
gives up and the application stops. I'm not sure why it keeps reporting not 
enough slots after that point. In my experience this may happen when job eats 
up all the available slots, so that after container failure its tasks cannot be 
restarted in other (live) containers. But I believe once the decision to stop 
the application is made, there should not be any further attempts to restart 
the job, hence no logs like those. Hopefully, someone else will explain this to 
us :)

In my case I made jobmanager restart containers infinitely by setting 
yarn.maximum-failed-containers = -1, so that taskmanager failure never results 
in application death. Note this is unlikely a good choice for a batch job.

Regards,
Yury

2017-01-05 3:21 GMT+03:00 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>>:
In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice and 
I'm wondering if anyone has some insight about it.

In both cases, we deployed a job that fails very frequently (within 15s-1m of 
launch). Eventually, the Flink cluster dies.

The sequence of events looks something like this:

  *   bad job is launched
  *   bad job fails & is restarted many times (I didn't have the "failure-rate" 
restart strategy configuration right)
  *   Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner (SIGTERM 
handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
  *   At this point, the YARN resource manager also logs the container failure
  *   More logs: Container 
ResourceID{resourceId='container_1481658997383_0003_01_000013'} failed. Exit 
status: Pmem limit exceeded (-104)
  *
Diagnostics for container 
ResourceID{resourceId='container_1481658997383_0003_01_000013'} in state 
COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container 
[pid=21246,containerID=container_1481658997383_0003_01_000013] is running 
beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB physical memory 
used; 9.6 GB of 28.1 GB virtual memory used. Killing container.
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Total number of failed containers so far: 12
Stopping YARN session because the number of failed containers (12) exceeded the 
maximum failed containers (11). This number is controlled by the 
'yarn.maximum-failed-containers' configuration setting. By default its the 
number of requested containers.
  *   From here onward, the logs repeatedly show that jobs fail to restart due 
to "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Not enough free slots available to run the job. You can decrease the operator 
parallelism or increase the number of slots per TaskManager in the 
configuration. Task to schedule: < Attempt #68 (Source: …) @ (unassigned) - 
[SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in sharing 
group < SlotSharingGroup [73191c171abfff61fb5102c161274145, 
19596f7834805c8409c419f0edab1f1b] >. Resources available to scheduler: Number 
of instances=0, total number of slots=0, available slots=0"
  *   Eventually, Flink stops for some reason (with another SIGTERM message), 
presumably because of YARN

Does anyone have an idea why a bad job repeatedly failing would eventually 
result in the Flink cluster dying?

Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots available 
to run the job"? The JVM heap usage and the free memory on the machines both 
look reasonable in my monitoring dashboards. Could it possibly be a memory leak 
due to classloading or something?

Thanks for any help or suggestions you can provide! I am hoping that the 
"failure-rate" restart strategy will help avoid this issue in the future, but 
I'd also like to understand what's making the cluster die so that I can prevent 
it.

-Shannon


Reply via email to