[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14948584#comment-14948584 ] ASF GitHub Bot commented on FLINK-2790: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1213 > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946727#comment-14946727 ] ASF GitHub Bot commented on FLINK-2790: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-146170086 I have a dependency problem with Curator leading to: ```bash ERROR org.apache.flink.runtime.jobmanager.JobManager- Error while starting up JobManager java.lang.NoSuchMethodError: org.apache.curator.utils.PathUtils.validatePath(Ljava/lang/String;)Ljava/lang/String; at org.apache.curator.framework.imps.NamespaceImpl.(NamespaceImpl.java:37) at org.apache.curator.framework.imps.CuratorFrameworkImpl.(CuratorFrameworkImpl.java:113) at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:124) at org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:83) at org.apache.flink.runtime.util.ZooKeeperUtils.createLeaderElectionService(ZooKeeperUtils.java:145) at org.apache.flink.runtime.util.LeaderElectionUtils.createLeaderElectionService(LeaderElectionUtils.java:52) at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:1595) at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:1672) at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:1629) at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:1307) at org.apache.flink.yarn.ApplicationMasterBase.runAction(ApplicationMasterBase.scala:127) at org.apache.flink.yarn.ApplicationMasterBase$$anon$1.run(ApplicationMasterBase.scala:76) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:360) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1608) at org.apache.flink.yarn.ApplicationMasterBase.run(ApplicationMasterBase.scala:74) at org.apache.flink.yarn.ApplicationMaster$.main(ApplicationMaster.scala:35) at org.apache.flink.yarn.ApplicationMaster.main(ApplicationMaster.scala) ``` Our application master class path: ```bash 13:45:50,795 DEBUG org.apache.flink.yarn.ApplicationMaster - All environment variables: {PATH=/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/X11/bin, HADOOP_CONF_DIR=/Users/ufuk/Downloads/hadoop-2.6.0/etc/hadoop, MAX_APP_ATTEMPTS=2, HADOOP_SECURE_DN_PID_DIR=, HADOOP_PID_DIR=, MAIL=/var/mail/ufuk, LD_LIBRARY_PATH=:/Users/ufuk/Downloads/hadoop-2.6.0/lib/native:/Users/ufuk/Downloads/hadoop-2.6.0/lib/native, LOGNAME=ufuk, JVM_PID=13307, _DETACHED=false, PWD=/tmp/hadoop-ufuk/nm-local-dir/usercache/ufuk/appcache/application_1444217271951_0004/container_1444217271951_0004_01_01, HADOOP_YARN_USER=yarn, HADOOP_PREFIX=/Users/ufuk/Downloads/hadoop-2.6.0, LOCAL_DIRS=/tmp/hadoop-ufuk/nm-local-dir/usercache/ufuk/appcache/application_1444217271951_0004, YARN_IDENT_STRING=ufuk, HADOOP_SECURE_DN_LOG_DIR=/, SHELL=/bin/zsh, YARN_CONF_DIR=/Users/ufuk/Downloads/hadoop-2.6.0/etc/hadoop, JAVA_MAIN_CLASS_10305=org.apache.hadoop.yarn.server.nodemanager.NodeManager, LOG_DIRS=/Users/ufuk/Downloads/hadoop-2.6.0/logs/userlogs/application_1444217271951_0004/container_1444217271951_0004_01_01, _CLIENT_SHIP_FILES=file:/Users/ufuk/.flink/application_1444217271951_0004/flink-python-0.10-SNAPSHOT.jar,file:/Users/ufuk/.flink/application_1444217271951_0004/log4j-1.2.17.jar,file:/Users/ufuk/.flink/application_1444217271951_0004/slf4j-log4j12-1.7.7.jar,file:/Users/ufuk/.flink/application_1444217271951_0004/logback.xml,file:/Users/ufuk/.flink/application_1444217271951_0004/log4j.properties, _CLIENT_USERNAME=ufuk, HADOOP_YARN_HOME=/Users/ufuk/Downloads/hadoop-2.6.0, TMPDIR=/var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/, HADOOP_DATANODE_OPTS=-Dhadoop.security.logger=ERROR,RFAS -Dhadoop.security.logger=ERROR,RFAS , HADOOP_SECONDARYNAMENODE_OPTS=-Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender -Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender , _FLINK_JAR_PATH=file:/Users/ufuk/.flink/application_1444217271951_0004/flink-dist-0.10-SNAPSHOT.jar, __CF_USER_TEXT_ENCODING=0x1F5:0x0:0x0, LC_CTYPE=UTF-8, _CLIENT_TM_COUNT=2, _CLIENT_TM_MEMORY=1024, SHLVL=3, HADOOP_IDENT_STRING=ufuk, YARN_ROOT_LOGGER=INFO,RFA, _SLOTS=-1, _CLIENT_HOME_DIR=file:/Users/ufuk, APP_SUBMIT_TIME_ENV=1444218347305, NM_HOST=192.168.178.69, _APP_ID=application_1444217271951_0004, YARN_LOGFILE=yarn-ufuk-nodemanager-vinci.local.log, HADOOP_SECURE_DN_USER=,
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946643#comment-14946643 ] ASF GitHub Bot commented on FLINK-2790: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1213#discussion_r41374549 --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.{Terminated, ActorRef} +import org.apache.flink.runtime.FlinkActor +import org.apache.flink.runtime.execution.ExecutionState +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID +import org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID, +RequestLeaderSessionID} +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} +import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered, +AcknowledgeRegistration} +import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, TaskInFinalState} +import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect, +CheckIfJobRemoved, Alive} +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._ + +import scala.concurrent.duration._ + +import language.postfixOps + +/** This mixin can be used to decorate a TaskManager with messages for testing purposes. + * --- End diff -- empty line > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946612#comment-14946612 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-146136443 I found the issue. My code did not properly reflected the methods `setKeepContainersAcrossApplicationAttempts` and `setAttemptFailuresValidityInterval`. With the fix, already started containers are retained. Tested it with Yarn 2.7.1. > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946613#comment-14946613 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-146136479 Good catch @rmetzger :-) > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946657#comment-14946657 ] ASF GitHub Bot commented on FLINK-2790: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-146150504 The code looks good. :-) I like the changes around the testing classes and the changes to the retrieval utils (that one I've looked into in more detail). I didn't check the YARN logic in detail though. I'm going to try it out now locally and report back. I expect that to work just fine and then we can safely merge this I think. :) > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946640#comment-14946640 ] ASF GitHub Bot commented on FLINK-2790: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1213#discussion_r41374461 --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.{Terminated, Cancellable, ActorRef} +import akka.pattern.{ask, pipe} +import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.FlinkActor +import org.apache.flink.runtime.execution.ExecutionState +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.JobManager +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged +import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} +import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect, +CheckIfJobRemoved, Alive} +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged + +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.duration._ + +import language.postfixOps + +/** This mixin can be used to decorate a JobManager with messages for testing purpose. + * --- End diff -- empty line > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946642#comment-14946642 ] ASF GitHub Bot commented on FLINK-2790: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1213#discussion_r41374493 --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.{Terminated, Cancellable, ActorRef} +import akka.pattern.{ask, pipe} +import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.FlinkActor +import org.apache.flink.runtime.execution.ExecutionState +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.JobManager +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged +import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} +import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect, +CheckIfJobRemoved, Alive} +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged + +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.duration._ + +import language.postfixOps + +/** This mixin can be used to decorate a JobManager with messages for testing purpose. + * + */ +trait TestingJobManagerLike extends FlinkActor { + that: JobManager => + + import scala.collection.JavaConverters._ + import context._ + + val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() + val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]() + + val waitForAllVerticesToBeRunningOrFinished = +scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() + + var periodicCheck: Option[Cancellable] = None + + val waitForJobStatus = scala.collection.mutable.HashMap[JobID, +collection.mutable.HashMap[JobStatus, Set[ActorRef]]]() + + val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]() + + val waitForLeader = scala.collection.mutable.HashSet[ActorRef]() + + val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder( +new Ordering[(Int, ActorRef)] { + override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1 +}) + + var disconnectDisabled = false + + abstract override def handleMessage: Receive = { +handleTestingMessage orElse super.handleMessage + } + + def handleTestingMessage: Receive = { +case Alive => sender() ! Acknowledge + +case RequestExecutionGraph(jobID) => + currentJobs.get(jobID) match { +case Some((executionGraph, jobInfo)) => sender() ! decorateMessage( + ExecutionGraphFound( +jobID, +executionGraph) +) + +case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender()) + } + +case WaitForAllVerticesToBeRunning(jobID) => + if(checkIfAllVerticesRunning(jobID)){ +sender() ! decorateMessage(AllVerticesRunning(jobID)) + }else{ +val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]()) +waitForAllVerticesToBeRunning += jobID -> (waiting + sender()) + +
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946651#comment-14946651 ] ASF GitHub Bot commented on FLINK-2790: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1213#discussion_r41374754 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java --- @@ -0,0 +1,867 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Records; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** +* All classes in this package contain code taken from +* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc +* and +* https://github.com/hortonworks/simple-yarn-app +* and +* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java +* +* The Flink jar is uploaded to HDFS by this client. +* The application master and all the TaskManager containers get the jar file downloaded +* by YARN into their local fs. +* +*/ +public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { + private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class); + + /** +* Constants, +* all starting with ENV_ are used as environment variables to pass values from the Client +* to the Application Master. +*/ + public final static String
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946825#comment-14946825 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1213#discussion_r41387623 --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.{Terminated, Cancellable, ActorRef} +import akka.pattern.{ask, pipe} +import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.FlinkActor +import org.apache.flink.runtime.execution.ExecutionState +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.JobManager +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged +import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} +import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect, +CheckIfJobRemoved, Alive} +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged + +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.duration._ + +import language.postfixOps + +/** This mixin can be used to decorate a JobManager with messages for testing purpose. + * + */ +trait TestingJobManagerLike extends FlinkActor { + that: JobManager => + + import scala.collection.JavaConverters._ + import context._ + + val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() + val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]() + + val waitForAllVerticesToBeRunningOrFinished = +scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() + + var periodicCheck: Option[Cancellable] = None + + val waitForJobStatus = scala.collection.mutable.HashMap[JobID, +collection.mutable.HashMap[JobStatus, Set[ActorRef]]]() + + val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]() + + val waitForLeader = scala.collection.mutable.HashSet[ActorRef]() + + val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder( +new Ordering[(Int, ActorRef)] { + override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1 +}) + + var disconnectDisabled = false + + abstract override def handleMessage: Receive = { +handleTestingMessage orElse super.handleMessage + } + + def handleTestingMessage: Receive = { +case Alive => sender() ! Acknowledge + +case RequestExecutionGraph(jobID) => + currentJobs.get(jobID) match { +case Some((executionGraph, jobInfo)) => sender() ! decorateMessage( + ExecutionGraphFound( +jobID, +executionGraph) +) + +case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender()) + } + +case WaitForAllVerticesToBeRunning(jobID) => + if(checkIfAllVerticesRunning(jobID)){ +sender() ! decorateMessage(AllVerticesRunning(jobID)) + }else{ +val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]()) +waitForAllVerticesToBeRunning += jobID -> (waiting + sender()) + +
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946824#comment-14946824 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1213#discussion_r41387583 --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.{Terminated, Cancellable, ActorRef} +import akka.pattern.{ask, pipe} +import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.FlinkActor +import org.apache.flink.runtime.execution.ExecutionState +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.JobManager +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged +import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} +import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect, +CheckIfJobRemoved, Alive} +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged + +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.duration._ + +import language.postfixOps + +/** This mixin can be used to decorate a JobManager with messages for testing purpose. + * --- End diff -- Let's see whether it still recognized as a ScalaDoc > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946822#comment-14946822 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1213#discussion_r41387526 --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala --- @@ -103,18 +103,17 @@ class TestingCluster( instanceManager, scheduler, libraryCacheManager, - _, executionRetries, delayBetweenRetries, timeout, archiveCount, - leaderElectionService) = JobManager.createJobManagerComponents(config) + leaderElectionService) = JobManager.createJobManagerComponents( + config, --- End diff -- Good catch > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946827#comment-14946827 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1213#discussion_r41387700 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java --- @@ -0,0 +1,867 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Records; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** +* All classes in this package contain code taken from +* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc +* and +* https://github.com/hortonworks/simple-yarn-app +* and +* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java +* +* The Flink jar is uploaded to HDFS by this client. +* The application master and all the TaskManager containers get the jar file downloaded +* by YARN into their local fs. +* +*/ +public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { + private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class); + + /** +* Constants, +* all starting with ENV_ are used as environment variables to pass values from the Client +* to the Application Master. +*/ + public final static
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946826#comment-14946826 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1213#discussion_r41387670 --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.{Terminated, ActorRef} +import org.apache.flink.runtime.FlinkActor +import org.apache.flink.runtime.execution.ExecutionState +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID +import org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID, +RequestLeaderSessionID} +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} +import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered, +AcknowledgeRegistration} +import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, TaskInFinalState} +import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect, +CheckIfJobRemoved, Alive} +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._ + +import scala.concurrent.duration._ + +import language.postfixOps + +/** This mixin can be used to decorate a TaskManager with messages for testing purposes. + * --- End diff -- Fixed > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14947033#comment-14947033 ] ASF GitHub Bot commented on FLINK-2790: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-146232148 Smooth experience now. Works as expected with vanilla YARN 2.6.0. +1 to merge. > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14947059#comment-14947059 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-146239375 Thanks for the review @uce. I addressed your comments. I fixed the problem with Hadoop 2.6.0 by relocating Flink's curator dependency to `org.apache.flink.shaded.org.apache.curator`. Furthermore, I bumped `flink-shaded-curator's` Guava version to Flink's Guava version so that we don't include too many different Guava versions in the resulting `flink-dist.jar`. > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944675#comment-14944675 ] Till Rohrmann commented on FLINK-2790: -- What did the logs say? I try to reproduce it. On Mon, Oct 5, 2015 at 5:50 PM, ASF GitHub Bot (JIRA)> Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943008#comment-14943008 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-145450115 I think TMs are only kept alive if their containers have been properly started. If the AM happens to die while the TM container are started up, I think they will be terminated as well. Another question is how did you kill the AM and what do you mean with "[...] restarting properly. But I think that's not the expected behavior"? > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943014#comment-14943014 ] ASF GitHub Bot commented on FLINK-2790: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-145451351 Looks like a lot of work to figure out the different version behaviours. Good job and thanks for the clear explanation. :) I guess Robert meant with "not restarting properly" that the TMs were restarted as well. How does the way you kill the AM affect recovery? I will try this out later today. > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943029#comment-14943029 ] ASF GitHub Bot commented on FLINK-2790: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-145456411 I was just curious whether he killed them gracefully with a `PoisonPill` or via killing the JVM process. > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943559#comment-14943559 ] ASF GitHub Bot commented on FLINK-2790: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-145577205 I killed the AM using "kill" from the command line, and all the Taskmanagers were shutting down > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14942870#comment-14942870 ] ASF GitHub Bot commented on FLINK-2790: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1213#issuecomment-145404233 I've looked a bit over the code and it all looks like good changes. I've tested the code on a Hadoop 2.6.0 cluster and the YARN session was restarting properly. But I think that's not the expected behavior. We actually want the TMs to stay alive and reconnect to the new AM. > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2790) Add high availability support for Yarn
[ https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14941081#comment-14941081 ] ASF GitHub Bot commented on FLINK-2790: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1213 [FLINK-2790] [yarn] [ha] Adds high availability support for Yarn Adds high availability support for Yarn by exploiting Yarn's functionality to restart a failed application master. Depending on the Hadoop version the behaviour is an increasing superset of functionalities of the preceding version's behaviour ###2.3.0 <= version < 2.4.0 * Set the number of application attempts to the configuration value `yarn.application-attempts`. This means that the application can be restarted `yarn.application-attempts` time before yarn fails the application. In case of an application master, all other task manager containers will also be killed. ### 2.4.0 <= version < 2.6.0 * Additionally, enables that containers will be kept across application attempts. This avoids the killing of TaskManager containers in the case of an application master failure. This has the advantage that the startup time is faster and that the user does not have to wait for obtaining the container resources again. ### 2.6.0 <= version * Sets the attempts failure validity interval to the akka timeout value. The attempts failure validity interval says that an application is only killed after the system has seen the maximum number of application attempts during one interval. This avoids that a long lasting job will deplete it's application attempts. This PR also refactors the different Yarn components to allow the start-up of testing actors within Yarn. Furthermore, the `JobManager` start up logic is slightly extended to allow code reuse in the `ApplicationMasterBase`. The HA functionality is tested via the `YARNHighAvailabilityITCase` where an application master is multiple times killed. Each time it's checked that the single TaskManager successfully reconnects to the newly started `YarnJobManager`. In case of version `2.3.0`, the `TaskManager` is restarted. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink yarnHA Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1213.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1213 commit 1a18172ae69eb576638704f8e143a921aa8630d5 Author: Till RohrmannDate: 2015-09-01T14:35:48Z [FLINK-2790] [yarn] [ha] Adds high availability support for Yarn commit 5359676556d16610303d4f36fcbe5320ef4e6643 Author: Till Rohrmann Date: 2015-09-23T15:42:57Z Refactors JobManager's start actors method to be reusable commit d6a47cd8ad265c5122d1a67c09773cbc5a491261 Author: Till Rohrmann Date: 2015-09-24T12:55:18Z Yarn refactoring to introduce yarn testing functionality commit f9578f136dd41cd9829d712f7c62a59c9ea8e194 Author: Till Rohrmann Date: 2015-09-28T16:21:30Z Added support for testing yarn cluster. Extracted JobManager's and TaskManager's testing messages into stackable traits. commit dbfa16438ad9d7d61e8d1a582c8cd1de9352078e Author: Till Rohrmann Date: 2015-09-29T15:05:01Z Implemented YarnHighAvailabilityITCase using Akka messages for synchronization. > Add high availability support for Yarn > -- > > Key: FLINK-2790 > URL: https://issues.apache.org/jira/browse/FLINK-2790 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann > Fix For: 0.10 > > > Add master high availability support for Yarn. The idea is to let Yarn > restart a failed application master in a new container. For that, we set the > number of application retries to something greater than 1. > From version 2.4.0 onwards, it is possible to reuse already started > containers for the TaskManagers, thus, avoiding unnecessary restart delays. > From version 2.6.0 onwards, it is possible to specify an interval in which > the number of application attempts have to be exceeded in order to fail the > job. This will prevent long running jobs from eventually depleting all > available application attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)