[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-24 Thread Razvan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940707#comment-15940707
 ] 

Razvan commented on FLINK-6063:
---

Hi, it works with DFS for me though I'd underline it is required for HA more in 
the documentation. Thank you for the help!

> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
> Attachments: flink-conf.yaml, Logs.tar.gz, masters, slaves, zoo.cfg
>
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
> 2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:10,489 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager 
> akka://flink/user/taskmanager disconnects from JobManager 
> akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its 
> leadership.
> 2017-03-15 10:29:10,490 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling 
> all computations and discarding all cached data.
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Source: Custom Source 
> -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Flat Map (1/1) 
> (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager 
> disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: 
> Old JobManager lost its leadership.
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at 

[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-20 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932962#comment-15932962
 ] 

Till Rohrmann commented on FLINK-6063:
--

Hi Razvan, yes it is necessary to configure a DFS to be able to retrieve the 
old jobs. Otherwise the new leader won't find the old jobs. Have you tried 
whether it works with a DFS?

> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
> Attachments: flink-conf.yaml, Logs.tar.gz, masters, slaves, zoo.cfg
>
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
> 2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:10,489 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager 
> akka://flink/user/taskmanager disconnects from JobManager 
> akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its 
> leadership.
> 2017-03-15 10:29:10,490 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling 
> all computations and discarding all cached data.
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Source: Custom Source 
> -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Flat Map (1/1) 
> (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager 
> disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: 
> Old JobManager lost its leadership.
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> 

[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-17 Thread Razvan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929644#comment-15929644
 ] 

Razvan commented on FLINK-6063:
---

[~till.rohrmann] Hi, I just had a chat with [~dawidwys] and he pointed the 
issue might be that I'm using local filesystem instead of distributed FS. Could 
that be it?

> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
> Attachments: flink-conf.yaml, Logs.tar.gz, masters, slaves, zoo.cfg
>
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
> 2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:10,489 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager 
> akka://flink/user/taskmanager disconnects from JobManager 
> akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its 
> leadership.
> 2017-03-15 10:29:10,490 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling 
> all computations and discarding all cached data.
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Source: Custom Source 
> -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Flat Map (1/1) 
> (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager 
> disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: 
> Old JobManager lost its leadership.
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> 

[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-16 Thread Razvan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928047#comment-15928047
 ] 

Razvan commented on FLINK-6063:
---

Sure, here

id 0x15ad68d898c0005
2017-03-16 09:58:17,319 INFO  org.apache.zookeeper.server.NIOServerCnxnFactory  
- Accepted socket connection from /1.2.3.5:53748
2017-03-16 09:58:17,320 INFO  org.apache.zookeeper.server.ZooKeeperServer   
- Client attempting to establish new session at /1.2.3.5:53748
2017-03-16 09:58:17,322 INFO  org.apache.zookeeper.server.ZooKeeperServer   
- Established session 0x15ad68d898c0006 with negotiated timeout 
4 for client /1.2.3.5:53748
2017-03-16 09:58:18,336 INFO  org.apache.zookeeper.server.NIOServerCnxn 
- Closed socket connection for client /1.2.3.5:53748 which had 
sessionid 0x15ad68d898c0006
2017-03-16 10:10:23,881 WARN  org.apache.zookeeper.server.NIOServerCnxn 
- caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 
0x15ad68d898c0001, likely client has closed socket
at 
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
at java.lang.Thread.run(Thread.java:745)
2017-03-16 10:10:23,885 INFO  org.apache.zookeeper.server.NIOServerCnxn 
- Closed socket connection for client /1.2.3.4:45752 which had 
sessionid 0x15ad68d898c0001
2017-03-16 10:10:23,885 WARN  org.apache.zookeeper.server.NIOServerCnxn 
- caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 
0x15ad68d898c0002, likely client has closed socket
at 
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
at java.lang.Thread.run(Thread.java:745)
2017-03-16 10:10:23,887 INFO  org.apache.zookeeper.server.NIOServerCnxn 
- Closed socket connection for client /1.2.3.4:45754 which had 
sessionid 0x15ad68d898c0002
2017-03-16 13:11:21,841 INFO  org.apache.zookeeper.server.NIOServerCnxnFactory  
- Accepted socket connection from /[Client 2 IP here]:57692
2017-03-16 13:11:34,787 INFO  org.apache.zookeeper.server.ZooKeeperServer   
- Client attempting to renew session 0x35ad68d8b4d0005 at /[Client 
2 IP here]:57692
2017-03-16 13:11:34,787 INFO  org.apache.zookeeper.server.quorum.Learner
- Revalidating client: 0x35ad68d8b4d0005
2017-03-16 13:11:34,788 INFO  org.apache.zookeeper.server.ZooKeeperServer   
- Invalid session 0x35ad68d8b4d0005 for client /[Client 2 IP 
here]:57692, probably expired
2017-03-16 13:11:34,789 INFO  org.apache.zookeeper.server.NIOServerCnxn 
- Closed socket connection for client /[Client 2 IP here]:57692 
which had sessionid 0x35ad68d8b4d0005


> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
> 2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> 

[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-16 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928030#comment-15928030
 ] 

Till Rohrmann commented on FLINK-6063:
--

Hi Razvan,

it would interesting to see the complete logs of the new leader {{JobManager}} 
to see what happens after it gains the leadership.

> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
> 2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:10,489 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager 
> akka://flink/user/taskmanager disconnects from JobManager 
> akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its 
> leadership.
> 2017-03-15 10:29:10,490 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling 
> all computations and discarding all cached data.
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Source: Custom Source 
> -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Flat Map (1/1) 
> (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager 
> disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: 
> Old JobManager lost its leadership.
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at 

[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-16 Thread Razvan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927872#comment-15927872
 ] 

Razvan commented on FLINK-6063:
---

Below the code for the Job (Main Class):


import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import com.booxware.event.reader.SimpleEventReader;

public class StreamingAccuracyJob {

private final static String JOB_NAME = "Prototype CEP";

final static StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

private static void configureEnvironment() {
// start a checkpoint every 1 ms
env.enableCheckpointing(1);

// advanced options:

// set mode to exactly-once (this is the default)

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

// checkpoints have to complete within one minute, or are 
discarded
env.getCheckpointConfig().setCheckpointTimeout(6);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 
// number

// of

// restart

// attempts
Time.of(10, TimeUnit.SECONDS) // delay
));

env.setParallelism(1);
// env.setMaxParallelism(1);
// https://issues.apache.org/jira/browse/FLINK-5773

CheckpointConfig config = env.getCheckpointConfig();

config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

}

public static void main(String[] args) throws Exception {

DataStream stream = env.addSource(new 
FlinkKafkaConsumer010<>(KafkaConfiguration.TOPIC_NAME,
new SimpleStringSchema(), 
KafkaConfiguration.getConnectionProperties()));

configureEnvironment();

DataStream streamTuples = stream.flatMap(new 
JsonToTupleFlatMap());


streamTuples.keyBy(SimpleEventReader.FIELD_USERID_TUPLE_POSITION).flatMap(new 
AccuracyTestEvents());

env.execute(JOB_NAME);
}

}


> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  

[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-16 Thread Razvan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927844#comment-15927844
 ] 

Razvan commented on FLINK-6063:
---

Hi Dawid,

  Sorry to say but it's not fine, as you can see from the logs the job is not 
resumed, this was the purpose of HA and also TaskManagers connecting to the old 
master shows they didn't acknowledge the leader

> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
> 2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:10,489 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager 
> akka://flink/user/taskmanager disconnects from JobManager 
> akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its 
> leadership.
> 2017-03-15 10:29:10,490 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling 
> all computations and discarding all cached data.
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Source: Custom Source 
> -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Flat Map (1/1) 
> (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager 
> disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: 
> Old JobManager lost its leadership.
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>  

[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-16 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927836#comment-15927836
 ] 

Dawid Wysakowicz commented on FLINK-6063:
-

Hi Razvan, 

I've had a look at your logs and I think the TMs do connect to the new leader 
JM. See those lines:

```
2017-03-16 10:11:04,020 INFO org.apache.flink.runtime.taskmanager.TaskManager - 
TaskManager akka://flink/user/taskmanager disconnects from JobManager 
akka.tcp://flink@1.2.3.4:45164/user/jobmanager: Old JobManager lost its 
leadership.
2017-03-16 10:11:04,020 INFO org.apache.flink.runtime.taskmanager.TaskManager - 
Disassociating from JobManager
2017-03-16 10:11:04,025 INFO org.apache.flink.runtime.blob.BlobCache - Shutting 
down BlobCache
2017-03-16 10:11:04,042 INFO org.apache.flink.runtime.taskmanager.TaskManager - 
Trying to register at JobManager akka.tcp://flink@1.2.3.5:34987/user/jobmanager 
(attempt 1, timeout: 500 milliseconds)
2017-03-16 10:11:04,174 INFO org.apache.flink.runtime.taskmanager.TaskManager - 
Successful registration at JobManager 
(akka.tcp://flink@1.2.3.5:34987/user/jobmanager), starting network stack and 
library cache.
2017-03-16 10:11:04,174 INFO org.apache.flink.runtime.taskmanager.TaskManager - 
Determined BLOB server address to be /1.2.3.5:42030. Starting BLOB cache.
2017-03-16 10:11:04,175 INFO org.apache.flink.runtime.blob.BlobCache - Created 
BLOB cache storage directory /tmp/blobStore-92bf7fe1-bab0-498c-90bf-6ec44ec6cb1e
```

The further logs with failed association to old remote system are due to Akka 
system and have no impact on Flink cluster (if I am wrong [~till.rohrmann] 
please correct me). Anyway after some time they stop retrying and all 
communication happen between TMs and the new leader. Try to open webUI before 
and after killing JM, you will see that you are redirected to proper JMs and 
TMs will be connected correctly in both cases.

I believe everything is working fine, just maybe the logs could be a liitle bit 
clearer.

> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
> 2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:10,489 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager 
> akka://flink/user/taskmanager disconnects from JobManager 
> akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its 
> leadership.
> 2017-03-15 10:29:10,490 INFO  
> 

[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-16 Thread Razvan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927786#comment-15927786
 ] 

Razvan commented on FLINK-6063:
---

Hi Till, thanks for replying, sure I can attach the logs you mentioned 


Cluster configuration: Standalone cluster with JobManager at /1.2.3.4:44307
Using address 1.2.3.4:44307 to connect to JobManager.
JobManager web interface address http://1.2.3.4:8081
Starting execution of program
Submitting job with JobID: 2c64b1126f327261b0c43f33f3cf43ee. Waiting for job 
completion.
Connected to JobManager at 
Actor[akka.tcp://flink@1.2.3.4:44307/user/jobmanager#2001981191]
03/16/2017 09:40:10 Job execution switched to status RUNNING.
03/16/2017 09:40:10 Source: Custom Source -> Flat Map(1/1) switched to 
SCHEDULED 
03/16/2017 09:40:10 Source: Custom Source -> Flat Map(1/1) switched to 
DEPLOYING 
03/16/2017 09:40:10 Flat Map(1/1) switched to SCHEDULED 
03/16/2017 09:40:10 Flat Map(1/1) switched to DEPLOYING 
03/16/2017 09:40:10 Flat Map(1/1) switched to RUNNING 
03/16/2017 09:40:10 Source: Custom Source -> Flat Map(1/1) switched to 
RUNNING 
New JobManager elected. Connecting to null
Connected to JobManager at 
Actor[akka.tcp://flink@1.2.3.5:43828/user/jobmanager#1400235434]


Killed JobManager

2017-03-16 09:58:14,953 INFO  org.apache.zookeeper.server.NIOServerCnxnFactory  
- Accepted socket connection from /[Client 1 IP here]:40858
2017-03-16 09:58:14,953 INFO  org.apache.zookeeper.server.ZooKeeperServer   
- Client attempting to establish new session at /[Client 1 IP 
here]:40858
2017-03-16 09:58:14,957 INFO  org.apache.zookeeper.server.ZooKeeperServer   
- Established session 0x35ad68d8b4d0004 with negotiated timeout 
4 for client /[Client 1 IP here]:40858
2017-03-16 09:58:15,523 INFO  org.apache.zookeeper.server.NIOServerCnxnFactory  
- Accepted socket connection from /[Client 2 IP here]:40276
2017-03-16 09:58:15,528 INFO  org.apache.zookeeper.server.ZooKeeperServer   
- Client attempting to establish new session at /[Client 2 IP 
here]:40276
2017-03-16 09:58:15,531 INFO  org.apache.zookeeper.server.ZooKeeperServer   
- Established session 0x35ad68d8b4d0005 with negotiated timeout 
4 for client /[Client 2 IP here]:40276
2017-03-16 10:10:25,118 WARN  org.apache.zookeeper.server.NIOServerCnxn 
- caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 
0x35ad68d8b4d0002, likely client has closed socket
at 
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
at java.lang.Thread.run(Thread.java:745)
2017-03-16 10:10:25,120 INFO  org.apache.zookeeper.server.NIOServerCnxn 
- Closed socket connection for client /1.2.3.4:47872 which had 
sessionid 0x35ad68d8b4d0002



New Leader

2017-03-16 09:58:17,319 INFO  org.apache.zookeeper.server.NIOServerCnxnFactory  
- Accepted socket connection from /1.2.3.5:53748
2017-03-16 09:58:17,320 INFO  org.apache.zookeeper.server.ZooKeeperServer   
- Client attempting to establish new session at /1.2.3.5:53748
2017-03-16 09:58:17,322 INFO  org.apache.zookeeper.server.ZooKeeperServer   
- Established session 0x15ad68d898c0006 with negotiated timeout 
4 for client /1.2.3.5:53748
2017-03-16 09:58:18,336 INFO  org.apache.zookeeper.server.NIOServerCnxn 
- Closed socket connection for client /1.2.3.5:53748 which had 
sessionid 0x15ad68d898c0006
2017-03-16 10:10:23,881 WARN  org.apache.zookeeper.server.NIOServerCnxn 
- caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 
0x15ad68d898c0001, likely client has closed socket
at 
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
at java.lang.Thread.run(Thread.java:745)
2017-03-16 10:10:23,885 INFO  org.apache.zookeeper.server.NIOServerCnxn 
- Closed socket connection for client /1.2.3.4:45752 which had 
sessionid 0x15ad68d898c0001
2017-03-16 10:10:23,885 WARN  org.apache.zookeeper.server.NIOServerCnxn 
- caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 
0x15ad68d898c0002, likely client has closed socket
at 
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
at java.lang.Thread.run(Thread.java:745)
2017-03-16 10:10:23,887 INFO  org.apache.zookeeper.server.NIOServerCnxn 
- 

[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-15 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926682#comment-15926682
 ] 

Till Rohrmann commented on FLINK-6063:
--

Hi Razvan,

sometimes it can take a little while until ZooKeeper notifies the TaskManagers 
about the new leader. In the meantime it tries to reconnect to the old master. 
But as soon as the new leader information is written to ZooKeeper and sent to 
the TaskManagers they should try to connect to the new leader. How long have 
you tried it out? Maybe it would be helpful to also see the JobManager log 
which became the new leader and more of the TaskManager logs.



> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
> 2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:10,489 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager 
> akka://flink/user/taskmanager disconnects from JobManager 
> akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its 
> leadership.
> 2017-03-15 10:29:10,490 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling 
> all computations and discarding all cached data.
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Source: Custom Source 
> -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Flat Map (1/1) 
> (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager 
> disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: 
> Old JobManager lost its leadership.
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>