Till, thanks for the clarification. yes, that situation is undesirable either.
In our case, restarting jobmanager could also recover the job from akk association lock-out. it was actually the issue (high GC pause) on jobmanager side that caused the akka failure. do we have sth like "jobmanager.exit-on-fatal-akka-error: true"? does it make sense to terminate jobmanager in this case? Thanks, Steven On Sun, May 13, 2018 at 1:12 PM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Steven, > > the reason why we did not turn on this feature per default was that in > case of a true JM failure, all of the TMs will think that they got > quarantined which triggers their shut down. Depending on how many container > restarts you have left on Yarn, for example, this can lead to a situation > where Flink is not able to recover the job even though it needed to only > restart the JM container. > > Cheers, > Till > > On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <stevenz...@gmail.com> wrote: > >> Till, >> >> We ran into the same issue. It started with high GC pause that caused >> jobmanager to lose zk conn and leadership and caused jobmanager to >> quarantine taskmanager in akka. Once quarantined, akka association btw >> jobmanager and taskmanager is locked forever. >> >> Your suggestion of " taskmanager.exit-on-fatal-akka-error: true" worked. >> taskmanager exited and replacement taskmanager joined the cluster >> afterwards. I am wondering why is this not defaulted to "true". Any >> downside? >> >> Thanks, >> Steven >> >> On Sat, Feb 24, 2018 at 7:02 AM, ashish pok <ashish...@yahoo.com> wrote: >> >>> @Jelmer, this is Till's las response on the issue. >>> >>> -- Ashish >>> >>> On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann >>> <trohrm...@apache.org> wrote: >>> Hi, >>> >>> this sounds like a serious regression wrt Flink 1.3.2 and we should >>> definitely find out what's causing this problem. Given from what I see in >>> the logs, the following happens: >>> >>> For some time the JobManager seems to no longer receive heartbeats from >>> the TaskManager. This could be, for example, due to long GC pauses or heavy >>> load which starves the ActorSystem's threads which are responsible for >>> sending the heartbeats. Due to this, the TM's ActorSystem is quarantined >>> which effectively renders them useless because the JM will henceforth >>> ignore all messages from these systems. The only way to resolve this >>> problem is to restart the ActorSystem. By setting >>> taskmanager.exit-on-fatal-akka-error >>> to true in flink-conf.yaml, a quarantined TM will shut down. If you run the >>> Flink cluster on Yarn, then a new substitute TM will be started if you have >>> still some container restarts left. That way, the system should be able to >>> recover. >>> >>> Additionally you could try to play around with akka.watch.heartbeat.interval >>> and akka.watch.heartbeat.pause which control the heartbeat interval and the >>> acceptable pause. By increasing the latter, the system should tolerate >>> longer GC pauses and period of high load. >>> >>> However, this only addresses the symptoms of the problem and I'd like to >>> find out what's causing the problem. In order to further debug the problem, >>> it would be really helpful to obtain the logs of the JobManager and the >>> TaskManagers on DEBUG log level and with >>> taskmanager.debug.memory.startLogThread >>> set to true. Additionally it would be interesting to see whats happening on >>> the TaskManagers when you observe high load. So obtaining a profiler dump >>> via VisualVM would be great. And last but not least, it also helps to learn >>> more about the job you're running. What kind of connectors is it using? Are >>> you using Flink's metric system? How is the Flink cluster deployed? Which >>> other libraries are you using in your job? >>> >>> Thanks a lot for your help! >>> >>> Cheers, >>> Till >>> >>> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cre...@gmail.com> wrote: >>> >>> I've seen a similar issue while running successive Flink SQL batches on >>> 1.4. In my case, the Job Manager would fail with the log output about >>> unreachability (with an additional statement about something going >>> "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where >>> everything works perfectly, but we will try again soon on 1.4. When we do I >>> will post the actual log output. >>> >>> This was on YARN in AWS, with akka.ask.timeout = 60s. >>> >>> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel <ashish...@yahoo.com> >>> wrote: >>> >>> I haven’t gotten much further with this. It doesn’t look like GC related >>> - at least GC counters were not that atrocious. However, my main concern >>> was once the load subsides why aren’t TM and JM connecting again? That >>> doesn’t look normal. I could definitely tell JM was listening on the port >>> and from logs it does appear TM is trying to message JM that is still >>> alive. >>> >>> Thanks, Ashish >>> >>> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard < >>> lassenederga...@gmail.com> wrote: >>> >>> Hi. >>> >>> Did you find a reason for the detaching ? >>> I sometimes see the same on our system running Flink 1.4 on dc/os. I >>> have enabled taskmanager.Debug.memory.start logthread for debugging. >>> >>> Med venlig hilsen / Best regards >>> Lasse Nedergaard >>> >>> >>> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong <duckientru...@gmail.com>: >>> >>> Hi, >>> >>> You should enable and check your garbage collection log. >>> >>> We've encountered case where Task Manager disassociated due to long GC >>> pause. >>> >>> >>> Regards, >>> >>> Kien >>> On 1/20/2018 1:27 AM, ashish pok wrote: >>> >>> Hi All, >>> >>> We have hit some load related issues and was wondering if any one has >>> some suggestions. We are noticing task managers and job managers being >>> detached from each other under load and never really sync up again. As a >>> result, Flink session shows 0 slots available for processing. Even though, >>> apps are configured to restart it isn't really helping as there are no >>> slots available to run the apps. >>> >>> >>> Here are excerpt from logs that seemed relevant. (I am trimming out rest >>> of the logs for brevity) >>> >>> *Job Manager:* >>> 2018-01-19 12:38:00,423 INFO org.apache.flink.runtime.jobma >>> nager.JobManager - Starting JobManager (Version: 1.4.0, >>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>> >>> 2018-01-19 12:38:00,792 INFO org.apache.flink.runtime.jobma >>> nager.JobManager - Maximum heap size: 16384 MiBytes >>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>> nager.JobManager - Hadoop version: 2.6.5 >>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>> nager.JobManager - JVM Options: >>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>> nager.JobManager - -Xms16384m >>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>> nager.JobManager - -Xmx16384m >>> 2018-01-19 12:38:00,795 INFO org.apache.flink.runtime.jobma >>> nager.JobManager - -XX:+UseG1GC >>> >>> 2018-01-19 12:38:00,908 INFO org.apache.flink.configuration >>> .GlobalConfiguration - Loading configuration property: >>> jobmanager.rpc.port, 6123 >>> 2018-01-19 12:38:00,908 INFO org.apache.flink.configuration >>> .GlobalConfiguration - Loading configuration property: >>> jobmanager.heap.mb, 16384 >>> >>> >>> 2018-01-19 12:53:34,671 WARN akka.remote.RemoteWatcher >>> - Detected unreachable: [akka.tcp://flink@<jm-host>:37 >>> 840] >>> 2018-01-19 12:53:34,676 INFO org.apache.flink.runtime.jobma >>> nager.JobManager - Task manager >>> akka.tcp://flink@<jm-host>:378 >>> 40/user/taskmanager terminated. >>> >>> -- So once Flink session boots up, we are hitting it with pretty heavy >>> load, which typically results in the WARN above >>> >>> *Task Manager:* >>> 2018-01-19 12:38:01,002 INFO org.apache.flink.runtime.taskm >>> anager.TaskManager - Starting TaskManager (Version: 1.4.0, >>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>> anager.TaskManager - Hadoop version: 2.6.5 >>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>> anager.TaskManager - JVM Options: >>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>> anager.TaskManager - -Xms16384M >>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>> anager.TaskManager - -Xmx16384M >>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>> anager.TaskManager - -XX:MaxDirectMemorySize=83886 07T >>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>> anager.TaskManager - -XX:+UseG1GC >>> >>> 2018-01-19 12:38:01,392 INFO org.apache.flink.configuration >>> .GlobalConfiguration - Loading configuration property: >>> jobmanager.rpc.port, 6123 >>> 2018-01-19 12:38:01,392 INFO org.apache.flink.configuration >>> .GlobalConfiguration - Loading configuration property: >>> jobmanager.heap.mb, 16384 >>> >>> >>> 2018-01-19 12:54:48,626 WARN akka.remote.RemoteWatcher >>> - Detected unreachable: [akka.tcp://flink@<jm-host>:61 >>> 23] >>> 2018-01-19 12:54:48,690 INFO akka.remote.Remoting >>> - Quarantined address [akka.tcp://flink@<jm-host>:61 >>> 23] is still unreachable or has not been restarted. Keeping it >>> quarantined. >>> 018-01-19 12:54:48,774 WARN akka.remote.Remoting >>> - Tried to associate with unreachable remote address >>> [akka.tcp://flink@<tm-host>:61 >>> 23]. Address is now gated for 5000 ms, all messages to this address >>> will be delivered to dead letters. Reason: [The remote system has a UID >>> that has been quarantined. Association aborted.] >>> 2018-01-19 12:54:48,833 WARN akka.remote.Remoting >>> - Tried to associate with unreachable remote address >>> [akka.tcp://flink@<tm-host>:61 >>> 23]. Address is now gated for 5000 ms, all messages to this address >>> will be delivered to dead letters. Reason: [The remote system has >>> quarantined this system. No further associations to the remote system are >>> possible until this system is restarted.] >>> <bunch of ERRORs on operations not shutdown properly - assuming because >>> JM is unreachable> >>> >>> 2018-01-19 12:56:51,244 INFO org.apache.flink.runtime.taskm >>> anager.TaskManager - Trying to register at JobManager >>> akka.tcp://flink@<jm-host>:612 >>> 3/user/jobmanager (attempt 10, timeout: 30000 milliseconds) >>> 2018-01-19 12:56:51,253 WARN akka.remote.Remoting >>> - Tried to associate with unreachable remote address >>> [akka.tcp://flink@<jm-host>:61 >>> 23]. Address is now gated for 5000 ms, all messages to this address >>> will be delivered to dead letters. Reason: [The remote system has >>> quarantined this system. No further associations to the remote system are >>> possible until this system is restarted.] >>> >>> So bottom line is, JM and TM couldn't communicate under load, which is >>> obviously not good. I tried to bump up akka.tcp.timeout as well but it >>> didnt help either. So my question here is after all processing is halted >>> and there is no new data being picked up, shouldn't this environment >>> self-heal? Any other things I can be looking at other than extending >>> timeouts? >>> >>> Thanks, >>> >>> Ashish >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >> >