Till,

thanks for the clarification. yes, that situation is undesirable either.

In our case, restarting jobmanager could also recover the job from akk
association lock-out. it was actually the issue (high GC pause) on
jobmanager side that caused the akka failure.

do we have sth like "jobmanager.exit-on-fatal-akka-error: true"? does it
make sense to terminate jobmanager in this case?

Thanks,
Steven

On Sun, May 13, 2018 at 1:12 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Steven,
>
> the reason why we did not turn on this feature per default was that in
> case of a true JM failure, all of the TMs will think that they got
> quarantined which triggers their shut down. Depending on how many container
> restarts you have left on Yarn, for example, this can lead to a situation
> where Flink is not able to recover the job even though it needed to only
> restart the JM container.
>
> Cheers,
> Till
>
> On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Till,
>>
>> We ran into the same issue. It started with high GC pause that caused
>> jobmanager to lose zk conn and leadership and caused jobmanager to
>> quarantine taskmanager in akka. Once quarantined, akka association btw
>> jobmanager and taskmanager is locked forever.
>>
>> Your suggestion of " taskmanager.exit-on-fatal-akka-error: true" worked.
>> taskmanager exited and replacement taskmanager joined the cluster
>> afterwards. I am wondering why is this not defaulted to "true". Any
>> downside?
>>
>> Thanks,
>> Steven
>>
>> On Sat, Feb 24, 2018 at 7:02 AM, ashish pok <ashish...@yahoo.com> wrote:
>>
>>> @Jelmer, this is Till's las response on the issue.
>>>
>>> -- Ashish
>>>
>>> On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann
>>> <trohrm...@apache.org> wrote:
>>> Hi,
>>>
>>> this sounds like a serious regression wrt Flink 1.3.2 and we should
>>> definitely find out what's causing this problem. Given from what I see in
>>> the logs, the following happens:
>>>
>>> For some time the JobManager seems to no longer receive heartbeats from
>>> the TaskManager. This could be, for example, due to long GC pauses or heavy
>>> load which starves the ActorSystem's threads which are responsible for
>>> sending the heartbeats. Due to this, the TM's ActorSystem is quarantined
>>> which effectively renders them useless because the JM will henceforth
>>> ignore all messages from these systems. The only way to resolve this
>>> problem is to restart the ActorSystem. By setting 
>>> taskmanager.exit-on-fatal-akka-error
>>> to true in flink-conf.yaml, a quarantined TM will shut down. If you run the
>>> Flink cluster on Yarn, then a new substitute TM will be started if you have
>>> still some container restarts left. That way, the system should be able to
>>> recover.
>>>
>>> Additionally you could try to play around with akka.watch.heartbeat.interval
>>> and akka.watch.heartbeat.pause which control the heartbeat interval and the
>>> acceptable pause. By increasing the latter, the system should tolerate
>>> longer GC pauses and period of high load.
>>>
>>> However, this only addresses the symptoms of the problem and I'd like to
>>> find out what's causing the problem. In order to further debug the problem,
>>> it would be really helpful to obtain the logs of the JobManager and the
>>> TaskManagers on DEBUG log level and with 
>>> taskmanager.debug.memory.startLogThread
>>> set to true. Additionally it would be interesting to see whats happening on
>>> the TaskManagers when you observe high load. So obtaining a profiler dump
>>> via VisualVM would be great. And last but not least, it also helps to learn
>>> more about the job you're running. What kind of connectors is it using? Are
>>> you using Flink's metric system? How is the Flink cluster deployed? Which
>>> other libraries are you using in your job?
>>>
>>> Thanks a lot for your help!
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cre...@gmail.com> wrote:
>>>
>>> I've seen a similar issue while running successive Flink SQL batches on
>>> 1.4. In my case, the Job Manager would fail with the log output about
>>> unreachability (with an additional statement about something going
>>> "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where
>>> everything works perfectly, but we will try again soon on 1.4. When we do I
>>> will post the actual log output.
>>>
>>> This was on YARN in AWS, with akka.ask.timeout = 60s.
>>>
>>> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel <ashish...@yahoo.com>
>>> wrote:
>>>
>>> I haven’t gotten much further with this. It doesn’t look like GC related
>>> - at least GC counters were not that atrocious. However, my main concern
>>> was once the load subsides why aren’t TM and JM connecting again? That
>>> doesn’t look normal. I could definitely tell JM was listening on the port
>>> and from logs it does appear TM is trying to message JM that is still
>>> alive.
>>>
>>> Thanks, Ashish
>>>
>>> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard <
>>> lassenederga...@gmail.com> wrote:
>>>
>>> Hi.
>>>
>>> Did you find a reason for the detaching ?
>>> I sometimes see the same on our system running Flink 1.4 on dc/os. I
>>> have enabled taskmanager.Debug.memory.start logthread for debugging.
>>>
>>> Med venlig hilsen / Best regards
>>> Lasse Nedergaard
>>>
>>>
>>> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong <duckientru...@gmail.com>:
>>>
>>> Hi,
>>>
>>> You should enable and check your garbage collection log.
>>>
>>> We've encountered case where Task Manager disassociated due to long GC
>>> pause.
>>>
>>>
>>> Regards,
>>>
>>> Kien
>>> On 1/20/2018 1:27 AM, ashish pok wrote:
>>>
>>> Hi All,
>>>
>>> We have hit some load related issues and was wondering if any one has
>>> some suggestions. We are noticing task managers and job managers being
>>> detached from each other under load and never really sync up again. As a
>>> result, Flink session shows 0 slots available for processing. Even though,
>>> apps are configured to restart it isn't really helping as there are no
>>> slots available to run the apps.
>>>
>>>
>>> Here are excerpt from logs that seemed relevant. (I am trimming out rest
>>> of the logs for brevity)
>>>
>>> *Job Manager:*
>>> 2018-01-19 12:38:00,423 INFO  org.apache.flink.runtime.jobma
>>> nager.JobManager                -  Starting JobManager (Version: 1.4.0,
>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>>
>>> 2018-01-19 12:38:00,792 INFO  org.apache.flink.runtime.jobma
>>> nager.JobManager                -  Maximum heap size: 16384 MiBytes
>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>> nager.JobManager                -  Hadoop version: 2.6.5
>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>> nager.JobManager                -  JVM Options:
>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>> nager.JobManager                -     -Xms16384m
>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>> nager.JobManager                -     -Xmx16384m
>>> 2018-01-19 12:38:00,795 INFO  org.apache.flink.runtime.jobma
>>> nager.JobManager                -     -XX:+UseG1GC
>>>
>>> 2018-01-19 12:38:00,908 INFO  org.apache.flink.configuration
>>> .GlobalConfiguration            - Loading configuration property:
>>> jobmanager.rpc.port, 6123
>>> 2018-01-19 12:38:00,908 INFO  org.apache.flink.configuration
>>> .GlobalConfiguration            - Loading configuration property:
>>> jobmanager.heap.mb, 16384
>>>
>>>
>>> 2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher
>>>                    - Detected unreachable: [akka.tcp://flink@<jm-host>:37
>>> 840]
>>> 2018-01-19 12:53:34,676 INFO  org.apache.flink.runtime.jobma
>>> nager.JobManager                - Task manager 
>>> akka.tcp://flink@<jm-host>:378
>>> 40/user/taskmanager terminated.
>>>
>>> -- So once Flink session boots up, we are hitting it with pretty heavy
>>> load, which typically results in the WARN above
>>>
>>> *Task Manager:*
>>> 2018-01-19 12:38:01,002 INFO  org.apache.flink.runtime.taskm
>>> anager.TaskManager              -  Starting TaskManager (Version: 1.4.0,
>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>> anager.TaskManager              -  Hadoop version: 2.6.5
>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>> anager.TaskManager              -  JVM Options:
>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>> anager.TaskManager              -     -Xms16384M
>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>> anager.TaskManager              -     -Xmx16384M
>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>> anager.TaskManager              -     -XX:MaxDirectMemorySize=83886 07T
>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>> anager.TaskManager              -     -XX:+UseG1GC
>>>
>>> 2018-01-19 12:38:01,392 INFO  org.apache.flink.configuration
>>> .GlobalConfiguration            - Loading configuration property:
>>> jobmanager.rpc.port, 6123
>>> 2018-01-19 12:38:01,392 INFO  org.apache.flink.configuration
>>> .GlobalConfiguration            - Loading configuration property:
>>> jobmanager.heap.mb, 16384
>>>
>>>
>>> 2018-01-19 12:54:48,626 WARN  akka.remote.RemoteWatcher
>>>                    - Detected unreachable: [akka.tcp://flink@<jm-host>:61
>>> 23]
>>> 2018-01-19 12:54:48,690 INFO  akka.remote.Remoting
>>>                     - Quarantined address [akka.tcp://flink@<jm-host>:61
>>> 23] is still unreachable or has not been restarted. Keeping it
>>> quarantined.
>>> 018-01-19 12:54:48,774 WARN  akka.remote.Remoting
>>>                   - Tried to associate with unreachable remote address 
>>> [akka.tcp://flink@<tm-host>:61
>>> 23]. Address is now gated for 5000 ms, all messages to this address
>>> will be delivered to dead letters. Reason: [The remote system has a UID
>>> that has been quarantined. Association aborted.]
>>> 2018-01-19 12:54:48,833 WARN  akka.remote.Remoting
>>>                     - Tried to associate with unreachable remote address 
>>> [akka.tcp://flink@<tm-host>:61
>>> 23]. Address is now gated for 5000 ms, all messages to this address
>>> will be delivered to dead letters. Reason: [The remote system has
>>> quarantined this system. No further associations to the remote system are
>>> possible until this system is restarted.]
>>> <bunch of ERRORs on operations not shutdown properly - assuming because
>>> JM is unreachable>
>>>
>>> 2018-01-19 12:56:51,244 INFO  org.apache.flink.runtime.taskm
>>> anager.TaskManager              - Trying to register at JobManager 
>>> akka.tcp://flink@<jm-host>:612
>>> 3/user/jobmanager (attempt 10, timeout: 30000 milliseconds)
>>> 2018-01-19 12:56:51,253 WARN  akka.remote.Remoting
>>>                     - Tried to associate with unreachable remote address 
>>> [akka.tcp://flink@<jm-host>:61
>>> 23]. Address is now gated for 5000 ms, all messages to this address
>>> will be delivered to dead letters. Reason: [The remote system has
>>> quarantined this system. No further associations to the remote system are
>>> possible until this system is restarted.]
>>>
>>> So bottom line is, JM and TM couldn't communicate under load, which is
>>> obviously not good. I tried to bump up akka.tcp.timeout as well but it
>>> didnt help either. So my question here is after all processing is halted
>>> and there is no new data being picked up, shouldn't this environment
>>> self-heal? Any other things I can be looking at other than extending
>>> timeouts?
>>>
>>> Thanks,
>>>
>>> Ashish
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Reply via email to