[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940707#comment-15940707 ] Razvan commented on FLINK-6063: --- Hi, it works with DFS for me though I'd underline it is required for HA more in the documentation. Thank you for the help! > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > Attachments: flink-conf.yaml, Logs.tar.gz, masters, slaves, zoo.cfg > > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,489 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > akka://flink/user/taskmanager disconnects from JobManager > akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its > leadership. > 2017-03-15 10:29:10,490 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Cancelling > all computations and discarding all cached data. > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Source: Custom Source > -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Flat Map (1/1) > (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: > Old JobManager lost its leadership. > at > org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at
[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932962#comment-15932962 ] Till Rohrmann commented on FLINK-6063: -- Hi Razvan, yes it is necessary to configure a DFS to be able to retrieve the old jobs. Otherwise the new leader won't find the old jobs. Have you tried whether it works with a DFS? > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > Attachments: flink-conf.yaml, Logs.tar.gz, masters, slaves, zoo.cfg > > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,489 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > akka://flink/user/taskmanager disconnects from JobManager > akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its > leadership. > 2017-03-15 10:29:10,490 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Cancelling > all computations and discarding all cached data. > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Source: Custom Source > -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Flat Map (1/1) > (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: > Old JobManager lost its leadership. > at > org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at >
[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929644#comment-15929644 ] Razvan commented on FLINK-6063: --- [~till.rohrmann] Hi, I just had a chat with [~dawidwys] and he pointed the issue might be that I'm using local filesystem instead of distributed FS. Could that be it? > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > Attachments: flink-conf.yaml, Logs.tar.gz, masters, slaves, zoo.cfg > > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,489 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > akka://flink/user/taskmanager disconnects from JobManager > akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its > leadership. > 2017-03-15 10:29:10,490 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Cancelling > all computations and discarding all cached data. > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Source: Custom Source > -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Flat Map (1/1) > (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: > Old JobManager lost its leadership. > at > org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at >
[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928047#comment-15928047 ] Razvan commented on FLINK-6063: --- Sure, here id 0x15ad68d898c0005 2017-03-16 09:58:17,319 INFO org.apache.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /1.2.3.5:53748 2017-03-16 09:58:17,320 INFO org.apache.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /1.2.3.5:53748 2017-03-16 09:58:17,322 INFO org.apache.zookeeper.server.ZooKeeperServer - Established session 0x15ad68d898c0006 with negotiated timeout 4 for client /1.2.3.5:53748 2017-03-16 09:58:18,336 INFO org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client /1.2.3.5:53748 which had sessionid 0x15ad68d898c0006 2017-03-16 10:10:23,881 WARN org.apache.zookeeper.server.NIOServerCnxn - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x15ad68d898c0001, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:745) 2017-03-16 10:10:23,885 INFO org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client /1.2.3.4:45752 which had sessionid 0x15ad68d898c0001 2017-03-16 10:10:23,885 WARN org.apache.zookeeper.server.NIOServerCnxn - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x15ad68d898c0002, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:745) 2017-03-16 10:10:23,887 INFO org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client /1.2.3.4:45754 which had sessionid 0x15ad68d898c0002 2017-03-16 13:11:21,841 INFO org.apache.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /[Client 2 IP here]:57692 2017-03-16 13:11:34,787 INFO org.apache.zookeeper.server.ZooKeeperServer - Client attempting to renew session 0x35ad68d8b4d0005 at /[Client 2 IP here]:57692 2017-03-16 13:11:34,787 INFO org.apache.zookeeper.server.quorum.Learner - Revalidating client: 0x35ad68d8b4d0005 2017-03-16 13:11:34,788 INFO org.apache.zookeeper.server.ZooKeeperServer - Invalid session 0x35ad68d8b4d0005 for client /[Client 2 IP here]:57692, probably expired 2017-03-16 13:11:34,789 INFO org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client /[Client 2 IP here]:57692 which had sessionid 0x35ad68d8b4d0005 > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused >
[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928030#comment-15928030 ] Till Rohrmann commented on FLINK-6063: -- Hi Razvan, it would interesting to see the complete logs of the new leader {{JobManager}} to see what happens after it gains the leadership. > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,489 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > akka://flink/user/taskmanager disconnects from JobManager > akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its > leadership. > 2017-03-15 10:29:10,490 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Cancelling > all computations and discarding all cached data. > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Source: Custom Source > -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Flat Map (1/1) > (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: > Old JobManager lost its leadership. > at > org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at
[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927872#comment-15927872 ] Razvan commented on FLINK-6063: --- Below the code for the Job (Main Class): import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import com.booxware.event.reader.SimpleEventReader; public class StreamingAccuracyJob { private final static String JOB_NAME = "Prototype CEP"; final static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static void configureEnvironment() { // start a checkpoint every 1 ms env.enableCheckpointing(1); // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(6); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, // number // of // restart // attempts Time.of(10, TimeUnit.SECONDS) // delay )); env.setParallelism(1); // env.setMaxParallelism(1); // https://issues.apache.org/jira/browse/FLINK-5773 CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); } public static void main(String[] args) throws Exception { DataStream stream = env.addSource(new FlinkKafkaConsumer010<>(KafkaConfiguration.TOPIC_NAME, new SimpleStringSchema(), KafkaConfiguration.getConnectionProperties())); configureEnvironment(); DataStream streamTuples = stream.flatMap(new JsonToTupleFlatMap()); streamTuples.keyBy(SimpleEventReader.FIELD_USERID_TUPLE_POSITION).flatMap(new AccuracyTestEvents()); env.execute(JOB_NAME); } } > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN
[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927844#comment-15927844 ] Razvan commented on FLINK-6063: --- Hi Dawid, Sorry to say but it's not fine, as you can see from the logs the job is not resumed, this was the purpose of HA and also TaskManagers connecting to the old master shows they didn't acknowledge the leader > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,489 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > akka://flink/user/taskmanager disconnects from JobManager > akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its > leadership. > 2017-03-15 10:29:10,490 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Cancelling > all computations and discarding all cached data. > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Source: Custom Source > -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Flat Map (1/1) > (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: > Old JobManager lost its leadership. > at > org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >
[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927836#comment-15927836 ] Dawid Wysakowicz commented on FLINK-6063: - Hi Razvan, I've had a look at your logs and I think the TMs do connect to the new leader JM. See those lines: ``` 2017-03-16 10:11:04,020 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@1.2.3.4:45164/user/jobmanager: Old JobManager lost its leadership. 2017-03-16 10:11:04,020 INFO org.apache.flink.runtime.taskmanager.TaskManager - Disassociating from JobManager 2017-03-16 10:11:04,025 INFO org.apache.flink.runtime.blob.BlobCache - Shutting down BlobCache 2017-03-16 10:11:04,042 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp://flink@1.2.3.5:34987/user/jobmanager (attempt 1, timeout: 500 milliseconds) 2017-03-16 10:11:04,174 INFO org.apache.flink.runtime.taskmanager.TaskManager - Successful registration at JobManager (akka.tcp://flink@1.2.3.5:34987/user/jobmanager), starting network stack and library cache. 2017-03-16 10:11:04,174 INFO org.apache.flink.runtime.taskmanager.TaskManager - Determined BLOB server address to be /1.2.3.5:42030. Starting BLOB cache. 2017-03-16 10:11:04,175 INFO org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage directory /tmp/blobStore-92bf7fe1-bab0-498c-90bf-6ec44ec6cb1e ``` The further logs with failed association to old remote system are due to Akka system and have no impact on Flink cluster (if I am wrong [~till.rohrmann] please correct me). Anyway after some time they stop retrying and all communication happen between TMs and the new leader. Try to open webUI before and after killing JM, you will see that you are redirected to proper JMs and TMs will be connected correctly in both cases. I believe everything is working fine, just maybe the logs could be a liitle bit clearer. > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,489 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > akka://flink/user/taskmanager disconnects from JobManager > akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its > leadership. > 2017-03-15 10:29:10,490 INFO >
[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927786#comment-15927786 ] Razvan commented on FLINK-6063: --- Hi Till, thanks for replying, sure I can attach the logs you mentioned Cluster configuration: Standalone cluster with JobManager at /1.2.3.4:44307 Using address 1.2.3.4:44307 to connect to JobManager. JobManager web interface address http://1.2.3.4:8081 Starting execution of program Submitting job with JobID: 2c64b1126f327261b0c43f33f3cf43ee. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@1.2.3.4:44307/user/jobmanager#2001981191] 03/16/2017 09:40:10 Job execution switched to status RUNNING. 03/16/2017 09:40:10 Source: Custom Source -> Flat Map(1/1) switched to SCHEDULED 03/16/2017 09:40:10 Source: Custom Source -> Flat Map(1/1) switched to DEPLOYING 03/16/2017 09:40:10 Flat Map(1/1) switched to SCHEDULED 03/16/2017 09:40:10 Flat Map(1/1) switched to DEPLOYING 03/16/2017 09:40:10 Flat Map(1/1) switched to RUNNING 03/16/2017 09:40:10 Source: Custom Source -> Flat Map(1/1) switched to RUNNING New JobManager elected. Connecting to null Connected to JobManager at Actor[akka.tcp://flink@1.2.3.5:43828/user/jobmanager#1400235434] Killed JobManager 2017-03-16 09:58:14,953 INFO org.apache.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /[Client 1 IP here]:40858 2017-03-16 09:58:14,953 INFO org.apache.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /[Client 1 IP here]:40858 2017-03-16 09:58:14,957 INFO org.apache.zookeeper.server.ZooKeeperServer - Established session 0x35ad68d8b4d0004 with negotiated timeout 4 for client /[Client 1 IP here]:40858 2017-03-16 09:58:15,523 INFO org.apache.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /[Client 2 IP here]:40276 2017-03-16 09:58:15,528 INFO org.apache.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /[Client 2 IP here]:40276 2017-03-16 09:58:15,531 INFO org.apache.zookeeper.server.ZooKeeperServer - Established session 0x35ad68d8b4d0005 with negotiated timeout 4 for client /[Client 2 IP here]:40276 2017-03-16 10:10:25,118 WARN org.apache.zookeeper.server.NIOServerCnxn - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x35ad68d8b4d0002, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:745) 2017-03-16 10:10:25,120 INFO org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client /1.2.3.4:47872 which had sessionid 0x35ad68d8b4d0002 New Leader 2017-03-16 09:58:17,319 INFO org.apache.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /1.2.3.5:53748 2017-03-16 09:58:17,320 INFO org.apache.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /1.2.3.5:53748 2017-03-16 09:58:17,322 INFO org.apache.zookeeper.server.ZooKeeperServer - Established session 0x15ad68d898c0006 with negotiated timeout 4 for client /1.2.3.5:53748 2017-03-16 09:58:18,336 INFO org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client /1.2.3.5:53748 which had sessionid 0x15ad68d898c0006 2017-03-16 10:10:23,881 WARN org.apache.zookeeper.server.NIOServerCnxn - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x15ad68d898c0001, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:745) 2017-03-16 10:10:23,885 INFO org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client /1.2.3.4:45752 which had sessionid 0x15ad68d898c0001 2017-03-16 10:10:23,885 WARN org.apache.zookeeper.server.NIOServerCnxn - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x15ad68d898c0002, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:745) 2017-03-16 10:10:23,887 INFO org.apache.zookeeper.server.NIOServerCnxn -
[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926682#comment-15926682 ] Till Rohrmann commented on FLINK-6063: -- Hi Razvan, sometimes it can take a little while until ZooKeeper notifies the TaskManagers about the new leader. In the meantime it tries to reconnect to the old master. But as soon as the new leader information is written to ZooKeeper and sent to the TaskManagers they should try to connect to the new leader. How long have you tried it out? Maybe it would be helpful to also see the JobManager log which became the new leader and more of the TaskManager logs. > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,489 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > akka://flink/user/taskmanager disconnects from JobManager > akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its > leadership. > 2017-03-15 10:29:10,490 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Cancelling > all computations and discarding all cached data. > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Source: Custom Source > -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Flat Map (1/1) > (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: > Old JobManager lost its leadership. > at > org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) >