Hi everyone,

we are trying to get to work JobManager HA in the context of a per-job YARN 
session using the 1.0.0-rc3 from a few days ago and are having a problem 
concerning task managers with several network interfaces.

After manually killing the job manager process, the jobmanager.log on the newly 
allocated second job manager reads:
---
2016-03-02 18:01:09,635 WARN  Remoting                                          
            - Tried to associate with unreachable remote address 
[akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms, all 
messages to this address will be delivered to dead letters. Reason: Connection 
refused: /10.127.68.136:34811
2016-03-02 18:01:09,644 WARN  
org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to 
retrieve leader gateway and port.
akka.actor.ActorNotFound: Actor not found for: 
ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/), 
Path(/user/jobmanager)]
        at 
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
        at 
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
        at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
        at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
        at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
        at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
        at 
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
        at 
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
        at 
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
        at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
        at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
        at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
        at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
        at 
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
        at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
        at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
        at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
        at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
        at 
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
        at akka.actor.ActorCell.terminate(ActorCell.scala:369)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
---
where the IP not found is from the old job manager. So far, is this the 
expected behavior?

The problem then arises on a new task manager, which also tries to connect to 
the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService starts 
cycling through the available network interfaces, as can be seen in the 
relevant taskmanager.log:
---
2016-03-02 18:01:13,636 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService.
2016-03-02 18:01:13,646 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to 
select the network interface and address to use by connecting to the leading 
JobManager.
2016-03-02 18:01:13,646 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager 
will try to connect for 10000 milliseconds before falling back to heuristics
2016-03-02 18:01:13,712 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Retrieved new target address /10.127.68.136:34811.
2016-03-02 18:01:14,079 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Trying to connect to address /10.127.68.136:34811
2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Failed to connect from address 
'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Failed to connect from address '/10.127.68.136': Connection 
refused
2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Failed to connect from address '/10.120.193.110': Connection 
refused
2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Failed to connect from address '/10.127.68.136': Connection 
refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Failed to connect from address '/127.0.0.1': Connection refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Failed to connect from address '/10.120.193.110': Connection 
refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Failed to connect from address '/10.127.68.136': Connection 
refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Failed to connect from address '/127.0.0.1': Connection refused
---
After five repetitions, the task manager stops trying to retrieve the leader 
and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from now 
on:
---
2016-03-02 18:01:23,650 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService.
2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn                   
            - EventThread shut down
2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper                    
            - Session: 0x25229757cff035b closed
2016-03-02 18:01:23,664 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - TaskManager will use hostname/address 
'task.manager.eth1.hostname.com' (10.120.193.110) for communication.
---
Following the new jobmanager is discovered and the taskmanager is able to 
register at the jobmanager using eth1. The problem is that connections TO eth1 
are not possible. So flink should always use eth0. The exception we later see 
is:
---
java.io.IOException: Connecting the channel failed: Connecting to remote task 
manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has failed. 
This might indicate that the remote task manager has been lost.
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
        at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
        at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:744)
---
The root cause seems to be that network interface selection is still using the 
old jobmanager location and hence is not able to choose the right interface. In 
particular, it seems that iteration order over the network interfaces differs 
between the HEURISTIC and SLOW strategy, which then leads to the wrong 
interface being selected.

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to