No I don't think this behaviour has been introduced by HA. That is the default behaviour we used for a long time. If you think we should still change it, then I can open an issue for it.
On Thu, Mar 3, 2016 at 12:20 PM, Stephan Ewen <se...@apache.org> wrote: > Okay, that is a change from the original behavior, introduced in HA. > Originally, if the connection attempts failed, it always returned the > InetAddress.getLocalHost() > interface. > I think we should change it back to that, because that interface is by far > the best possible heuristic. > > On Thu, Mar 3, 2016 at 11:39 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> If I’m not mistaken, then it’s not necessarily true that the heuristic >> returns InetAddress.getLocalHost() in all cases. The heuristic will >> select the first network interface with the afore-mentioned conditions but >> before returning it, it will try a last time to connect to the JM via the >> interface bound to InetAddress.getLocalHost(). However, if this fails, >> then the heuristically selected network interface will be returned. >> >> >> On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <se...@apache.org> wrote: >> >>> If the ThasManager cannot connect to the JobManager, it will use the >>> interface that is bound to the machine's host name >>> ("InetAddress.getLocalHost()"). >>> >>> So, the best way to fix this would be to make sure that all machines >>> have a proper network configuration. Then Flink would either use an address >>> that can connect (via trying various interfaces), or it would default back >>> to the hostname/interface that is configured on the machine. >>> >>> >>> On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Max, >>>> >>>> the problem is that before starting the TM, we have to find the network >>>> interface which is reachable by the other machines. So what we do is to >>>> connect to the current JobManager. If it should happen, as in your case, >>>> that the JobManager just died and the new JM address has not been written >>>> to ZooKeeper, then the TMs don’t have much choice other than using the >>>> heuristic. >>>> >>>> I can’t really tell why eth1 is chosen over eth0. The condition is that >>>> the interface address is an Inet4Address, no link local address as >>>> well as not a loopback address. >>>> >>>> Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the >>>> easiest solution to solve your problem. I’ve checked the default value is >>>> set to 10 s which might be a bit too low for restarting a new JM and >>>> publishing its address via ZooKeeper. >>>> >>>> Cheers, >>>> Till >>>> >>>> >>>> On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <u...@apache.org> wrote: >>>> >>>>> I had an offline chat with Till about this. He pointed out that the >>>>> address is chosen once at start up time (while not being able to >>>>> connect to the old job manager) and then it stays fixed at eth1. >>>>> >>>>> You can increase the lookup timeout by setting akka.lookup.timeout to >>>>> a higher value (like 100 s). This is the only workaroud I'm aware of >>>>> at this point. Maybe Till can chime in here whether this has other >>>>> implications as well? >>>>> >>>>> – Ufuk >>>>> >>>>> On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <u...@apache.org> wrote: >>>>> > Hey Max! >>>>> > >>>>> > for the first WARN in >>>>> > org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is >>>>> > expected if the new leader has not updated ZooKeeper yet. The >>>>> > important thing is that the new leading job manager is eventually >>>>> > retrieved. This did happen, right? >>>>> > >>>>> > Regarding eth1 vs. eth0: After the new job manager becomes leader, >>>>> the >>>>> > task manager should re-try connecting to it with the same strategy as >>>>> > in the initial connection establishment (e.g. try SLOW first and only >>>>> > fall back to HEURISTIC). Can you see in the logs whether this >>>>> happens? >>>>> > >>>>> > The best thing would be to share the complete logs. Is this possible? >>>>> > If not publicly, feel free to send it to me privately (uce at apache >>>>> > org). >>>>> > >>>>> > – Ufuk >>>>> > >>>>> > >>>>> > On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode >>>>> > <maximilian.b...@tngtech.com> wrote: >>>>> >> Hi everyone, >>>>> >> >>>>> >> we are trying to get to work JobManager HA in the context of a >>>>> per-job YARN >>>>> >> session using the 1.0.0-rc3 from a few days ago and are having a >>>>> problem >>>>> >> concerning task managers with several network interfaces. >>>>> >> >>>>> >> After manually killing the job manager process, the jobmanager.log >>>>> on the >>>>> >> newly allocated second job manager reads: >>>>> >> --- >>>>> >> 2016-03-02 18:01:09,635 WARN Remoting >>>>> >> - Tried to associate with unreachable remote address >>>>> >> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for >>>>> 5000 ms, >>>>> >> all messages to this address will be delivered to dead letters. >>>>> Reason: >>>>> >> Connection refused: /10.127.68.136:34811 >>>>> >> 2016-03-02 18:01:09,644 WARN >>>>> >> org.apache.flink.runtime.webmonitor.JobManagerRetriever - >>>>> Failed to >>>>> >> retrieve leader gateway and port. >>>>> >> akka.actor.ActorNotFound: Actor not found for: >>>>> >> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/), >>>>> >> Path(/user/jobmanager)] >>>>> >> at >>>>> >> >>>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) >>>>> >> at >>>>> >> >>>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) >>>>> >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >>>>> >> at >>>>> >> >>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) >>>>> >> at >>>>> >> >>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) >>>>> >> at >>>>> >> >>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) >>>>> >> at >>>>> >> >>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) >>>>> >> at >>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>>>> >> at >>>>> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) >>>>> >> at >>>>> >> >>>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) >>>>> >> at >>>>> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) >>>>> >> at >>>>> >> >>>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) >>>>> >> at >>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) >>>>> >> at >>>>> >> >>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) >>>>> >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) >>>>> >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) >>>>> >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) >>>>> >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) >>>>> >> at >>>>> >> >>>>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) >>>>> >> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561) >>>>> >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) >>>>> >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415) >>>>> >> at >>>>> >> >>>>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) >>>>> >> at >>>>> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) >>>>> >> at akka.actor.ActorCell.terminate(ActorCell.scala:369) >>>>> >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) >>>>> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) >>>>> >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) >>>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>>> >> at >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> >> at >>>>> >> >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> >> at >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> >> at >>>>> >> >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> >> --- >>>>> >> where the IP not found is from the old job manager. So far, is this >>>>> the >>>>> >> expected behavior? >>>>> >> >>>>> >> The problem then arises on a new task manager, which also tries to >>>>> connect >>>>> >> to the old job manager unsuccessfully. The >>>>> ZooKeeperLeaderRetrievalService >>>>> >> starts cycling through the available network interfaces, as can be >>>>> seen in >>>>> >> the relevant taskmanager.log: >>>>> >> --- >>>>> >> 2016-03-02 18:01:13,636 INFO >>>>> >> >>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService >>>>> - >>>>> >> Starting ZooKeeperLeaderRetrievalService. >>>>> >> 2016-03-02 18:01:13,646 INFO >>>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils - >>>>> Trying to >>>>> >> select the network interface and address to use by connecting to >>>>> the leading >>>>> >> JobManager. >>>>> >> 2016-03-02 18:01:13,646 INFO >>>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils - >>>>> TaskManager >>>>> >> will try to connect for 10000 milliseconds before falling back to >>>>> heuristics >>>>> >> 2016-03-02 18:01:13,712 INFO >>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>> >> - Retrieved new target address /10.127.68.136:34811. >>>>> >> 2016-03-02 18:01:14,079 INFO >>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>> >> - Trying to connect to address /10.127.68.136:34811 >>>>> >> 2016-03-02 18:01:14,082 INFO >>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>> >> - Failed to connect from address >>>>> >> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused >>>>> >> 2016-03-02 18:01:14,082 INFO >>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>> >> - Failed to connect from address '/10.127.68.136': Connection >>>>> refused >>>>> >> 2016-03-02 18:01:14,082 INFO >>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>> >> - Failed to connect from address '/10.120.193.110': Connection >>>>> refused >>>>> >> 2016-03-02 18:01:14,082 INFO >>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>> >> - Failed to connect from address '/10.127.68.136': Connection >>>>> refused >>>>> >> 2016-03-02 18:01:14,083 INFO >>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>> >> - Failed to connect from address '/127.0.0.1': Connection refused >>>>> >> 2016-03-02 18:01:14,083 INFO >>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>> >> - Failed to connect from address '/10.120.193.110': Connection >>>>> refused >>>>> >> 2016-03-02 18:01:14,083 INFO >>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>> >> - Failed to connect from address '/10.127.68.136': Connection >>>>> refused >>>>> >> 2016-03-02 18:01:14,083 INFO >>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>> >> - Failed to connect from address '/127.0.0.1': Connection refused >>>>> >> --- >>>>> >> After five repetitions, the task manager stops trying to retrieve >>>>> the leader >>>>> >> and using the HEURISTIC strategy ends up using eth1 >>>>> (10.120.193.110) from >>>>> >> now on: >>>>> >> --- >>>>> >> 2016-03-02 18:01:23,650 INFO >>>>> >> >>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService >>>>> - >>>>> >> Stopping ZooKeeperLeaderRetrievalService. >>>>> >> 2016-03-02 18:01:23,655 INFO org.apache.zookeeper.ClientCnxn >>>>> >> - EventThread shut down >>>>> >> 2016-03-02 18:01:23,655 INFO org.apache.zookeeper.ZooKeeper >>>>> >> - Session: 0x25229757cff035b closed >>>>> >> 2016-03-02 18:01:23,664 INFO >>>>> >> org.apache.flink.runtime.taskmanager.TaskManager - >>>>> TaskManager >>>>> >> will use hostname/address 'task.manager.eth1.hostname.com' >>>>> (10.120.193.110) >>>>> >> for communication. >>>>> >> --- >>>>> >> Following the new jobmanager is discovered and the taskmanager is >>>>> able to >>>>> >> register at the jobmanager using eth1. The problem is that >>>>> connections TO >>>>> >> eth1 are not possible. So flink should always use eth0. The >>>>> exception we >>>>> >> later see is: >>>>> >> --- >>>>> >> java.io.IOException: Connecting the channel failed: Connecting to >>>>> remote >>>>> >> task manager + 'other.task.manager.eth1.hostname/ >>>>> 10.120.193.111:46620' has >>>>> >> failed. This might indicate that the remote task manager has been >>>>> lost. >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411) >>>>> >> at >>>>> >> >>>>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108) >>>>> >> at >>>>> >> >>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) >>>>> >> at >>>>> >> >>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) >>>>> >> at >>>>> >> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) >>>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> >> at java.lang.Thread.run(Thread.java:744) >>>>> >> --- >>>>> >> The root cause seems to be that network interface selection is >>>>> still using >>>>> >> the old jobmanager location and hence is not able to choose the >>>>> right >>>>> >> interface. In particular, it seems that iteration order over the >>>>> network >>>>> >> interfaces differs between the HEURISTIC and SLOW strategy, which >>>>> then leads >>>>> >> to the wrong interface being selected. >>>>> >> >>>>> >> Cheers, >>>>> >> Max >>>>> >> — >>>>> >> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>>>> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>> >> >>>>> >>>> >>>> >>> >> >