[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14948584#comment-14948584
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1213


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946727#comment-14946727
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-146170086
  
I have a dependency problem with Curator leading to:

```bash
ERROR org.apache.flink.runtime.jobmanager.JobManager- Error 
while starting up JobManager
java.lang.NoSuchMethodError: 
org.apache.curator.utils.PathUtils.validatePath(Ljava/lang/String;)Ljava/lang/String;
at 
org.apache.curator.framework.imps.NamespaceImpl.(NamespaceImpl.java:37)
at 
org.apache.curator.framework.imps.CuratorFrameworkImpl.(CuratorFrameworkImpl.java:113)
at 
org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:124)
at 
org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:83)
at 
org.apache.flink.runtime.util.ZooKeeperUtils.createLeaderElectionService(ZooKeeperUtils.java:145)
at 
org.apache.flink.runtime.util.LeaderElectionUtils.createLeaderElectionService(LeaderElectionUtils.java:52)
at 
org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:1595)
at 
org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:1672)
at 
org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:1629)
at 
org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:1307)
at 
org.apache.flink.yarn.ApplicationMasterBase.runAction(ApplicationMasterBase.scala:127)
at 
org.apache.flink.yarn.ApplicationMasterBase$$anon$1.run(ApplicationMasterBase.scala:76)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1608)
at 
org.apache.flink.yarn.ApplicationMasterBase.run(ApplicationMasterBase.scala:74)
at 
org.apache.flink.yarn.ApplicationMaster$.main(ApplicationMaster.scala:35)
at org.apache.flink.yarn.ApplicationMaster.main(ApplicationMaster.scala)
```

Our application master class path:
```bash
13:45:50,795 DEBUG org.apache.flink.yarn.ApplicationMaster  
 - All environment variables: 
{PATH=/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/X11/bin, 
HADOOP_CONF_DIR=/Users/ufuk/Downloads/hadoop-2.6.0/etc/hadoop, 
MAX_APP_ATTEMPTS=2, HADOOP_SECURE_DN_PID_DIR=, HADOOP_PID_DIR=, 
MAIL=/var/mail/ufuk, 
LD_LIBRARY_PATH=:/Users/ufuk/Downloads/hadoop-2.6.0/lib/native:/Users/ufuk/Downloads/hadoop-2.6.0/lib/native,
 LOGNAME=ufuk, JVM_PID=13307, _DETACHED=false, 
PWD=/tmp/hadoop-ufuk/nm-local-dir/usercache/ufuk/appcache/application_1444217271951_0004/container_1444217271951_0004_01_01,
 HADOOP_YARN_USER=yarn, HADOOP_PREFIX=/Users/ufuk/Downloads/hadoop-2.6.0, 
LOCAL_DIRS=/tmp/hadoop-ufuk/nm-local-dir/usercache/ufuk/appcache/application_1444217271951_0004,
 YARN_IDENT_STRING=ufuk, HADOOP_SECURE_DN_LOG_DIR=/, SHELL=/bin/zsh, 
YARN_CONF_DIR=/Users/ufuk/Downloads/hadoop-2.6.0/etc/hadoop, 
JAVA_MAIN_CLASS_10305=org.apache.hadoop.yarn.server.nodemanager.NodeManager, 
LOG_DIRS=/Users/ufuk/Downloads/hadoop-2.6.0/logs/userlogs/application_1444217271951_0004/container_1444217271951_0004_01_01,
 
_CLIENT_SHIP_FILES=file:/Users/ufuk/.flink/application_1444217271951_0004/flink-python-0.10-SNAPSHOT.jar,file:/Users/ufuk/.flink/application_1444217271951_0004/log4j-1.2.17.jar,file:/Users/ufuk/.flink/application_1444217271951_0004/slf4j-log4j12-1.7.7.jar,file:/Users/ufuk/.flink/application_1444217271951_0004/logback.xml,file:/Users/ufuk/.flink/application_1444217271951_0004/log4j.properties,
 _CLIENT_USERNAME=ufuk, HADOOP_YARN_HOME=/Users/ufuk/Downloads/hadoop-2.6.0, 
TMPDIR=/var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/, 
HADOOP_DATANODE_OPTS=-Dhadoop.security.logger=ERROR,RFAS 
-Dhadoop.security.logger=ERROR,RFAS , 
HADOOP_SECONDARYNAMENODE_OPTS=-Dhadoop.security.logger=INFO,RFAS 
-Dhdfs.audit.logger=INFO,NullAppender -Dhadoop.security.logger=INFO,RFAS 
-Dhdfs.audit.logger=INFO,NullAppender , 
_FLINK_JAR_PATH=file:/Users/ufuk/.flink/application_1444217271951_0004/flink-dist-0.10-SNAPSHOT.jar,
 __CF_USER_TEXT_ENCODING=0x1F5:0x0:0x0, LC_CTYPE=UTF-8, _CLIENT_TM_COUNT=2, 
_CLIENT_TM_MEMORY=1024, SHLVL=3, HADOOP_IDENT_STRING=ufuk, 
YARN_ROOT_LOGGER=INFO,RFA, _SLOTS=-1, _CLIENT_HOME_DIR=file:/Users/ufuk, 
APP_SUBMIT_TIME_ENV=1444218347305, NM_HOST=192.168.178.69, 
_APP_ID=application_1444217271951_0004, 
YARN_LOGFILE=yarn-ufuk-nodemanager-vinci.local.log, HADOOP_SECURE_DN_USER=, 

[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946643#comment-14946643
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1213#discussion_r41374549
  
--- Diff: 
flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{Terminated, ActorRef}
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import 
org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID,
+RequestLeaderSessionID}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import 
org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
+AcknowledgeRegistration}
+import 
org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, 
TaskInFinalState}
+import org.apache.flink.runtime.taskmanager.TaskManager
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import 
org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
+CheckIfJobRemoved, Alive}
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
+
+import scala.concurrent.duration._
+
+import language.postfixOps
+
+/** This mixin can be used to decorate a TaskManager with messages for 
testing purposes.
+  *
--- End diff --

empty line


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946612#comment-14946612
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-146136443
  
I found the issue. My code did not properly reflected the methods 
`setKeepContainersAcrossApplicationAttempts` and 
`setAttemptFailuresValidityInterval`. With the fix, already started containers 
are retained. Tested it with Yarn 2.7.1.


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946613#comment-14946613
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-146136479
  
Good catch @rmetzger :-)


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946657#comment-14946657
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-146150504
  
The code looks good. :-) I like the changes around the testing classes and 
the changes to the retrieval utils (that one I've looked into in more detail). 
I didn't check the YARN logic in detail though. I'm going to try it out now 
locally and report back. I expect that to work just fine and then we can safely 
merge this I think. :)


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946640#comment-14946640
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1213#discussion_r41374461
  
--- Diff: 
flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 ---
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{Terminated, Cancellable, ActorRef}
+import akka.pattern.{ask, pipe}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.JobManager
+import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import 
org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import 
org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
+CheckIfJobRemoved, Alive}
+import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import language.postfixOps
+
+/** This mixin can be used to decorate a JobManager with messages for 
testing purpose.
+  *
--- End diff --

empty line


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946642#comment-14946642
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1213#discussion_r41374493
  
--- Diff: 
flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 ---
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{Terminated, Cancellable, ActorRef}
+import akka.pattern.{ask, pipe}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.JobManager
+import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import 
org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import 
org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
+CheckIfJobRemoved, Alive}
+import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import language.postfixOps
+
+/** This mixin can be used to decorate a JobManager with messages for 
testing purpose.
+  *
+  */
+trait TestingJobManagerLike extends FlinkActor {
+  that: JobManager =>
+
+  import scala.collection.JavaConverters._
+  import context._
+
+  val waitForAllVerticesToBeRunning = 
scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+  val waitForTaskManagerToBeTerminated = 
scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+
+  val waitForAllVerticesToBeRunningOrFinished =
+scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+
+  var periodicCheck: Option[Cancellable] = None
+
+  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
+collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
+
+  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, 
(Boolean, Set[ActorRef])]()
+
+  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
+
+  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
+new Ordering[(Int, ActorRef)] {
+  override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = 
y._1 - x._1
+})
+
+  var disconnectDisabled = false
+
+  abstract override def handleMessage: Receive = {
+handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+case Alive => sender() ! Acknowledge
+
+case RequestExecutionGraph(jobID) =>
+  currentJobs.get(jobID) match {
+case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
+  ExecutionGraphFound(
+jobID,
+executionGraph)
+)
+
+case None => 
archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
+  }
+
+case WaitForAllVerticesToBeRunning(jobID) =>
+  if(checkIfAllVerticesRunning(jobID)){
+sender() ! decorateMessage(AllVerticesRunning(jobID))
+  }else{
+val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, 
Set[ActorRef]())
+waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
+
+

[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946651#comment-14946651
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1213#discussion_r41374754
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java ---
@@ -0,0 +1,867 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+* All classes in this package contain code taken from
+* 
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+* and
+* https://github.com/hortonworks/simple-yarn-app
+* and
+* 
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+*
+* The Flink jar is uploaded to HDFS by this client.
+* The application master and all the TaskManager containers get the jar 
file downloaded
+* by YARN into their local fs.
+*
+*/
+public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnClient.class);
+
+   /**
+* Constants,
+* all starting with ENV_ are used as environment variables to pass 
values from the Client
+* to the Application Master.
+*/
+   public final static String 

[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946825#comment-14946825
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1213#discussion_r41387623
  
--- Diff: 
flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 ---
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{Terminated, Cancellable, ActorRef}
+import akka.pattern.{ask, pipe}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.JobManager
+import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import 
org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import 
org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
+CheckIfJobRemoved, Alive}
+import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import language.postfixOps
+
+/** This mixin can be used to decorate a JobManager with messages for 
testing purpose.
+  *
+  */
+trait TestingJobManagerLike extends FlinkActor {
+  that: JobManager =>
+
+  import scala.collection.JavaConverters._
+  import context._
+
+  val waitForAllVerticesToBeRunning = 
scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+  val waitForTaskManagerToBeTerminated = 
scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+
+  val waitForAllVerticesToBeRunningOrFinished =
+scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+
+  var periodicCheck: Option[Cancellable] = None
+
+  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
+collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
+
+  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, 
(Boolean, Set[ActorRef])]()
+
+  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
+
+  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
+new Ordering[(Int, ActorRef)] {
+  override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = 
y._1 - x._1
+})
+
+  var disconnectDisabled = false
+
+  abstract override def handleMessage: Receive = {
+handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+case Alive => sender() ! Acknowledge
+
+case RequestExecutionGraph(jobID) =>
+  currentJobs.get(jobID) match {
+case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
+  ExecutionGraphFound(
+jobID,
+executionGraph)
+)
+
+case None => 
archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
+  }
+
+case WaitForAllVerticesToBeRunning(jobID) =>
+  if(checkIfAllVerticesRunning(jobID)){
+sender() ! decorateMessage(AllVerticesRunning(jobID))
+  }else{
+val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, 
Set[ActorRef]())
+waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
+
+

[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946824#comment-14946824
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1213#discussion_r41387583
  
--- Diff: 
flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 ---
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{Terminated, Cancellable, ActorRef}
+import akka.pattern.{ask, pipe}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.JobManager
+import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import 
org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import 
org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
+CheckIfJobRemoved, Alive}
+import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import language.postfixOps
+
+/** This mixin can be used to decorate a JobManager with messages for 
testing purpose.
+  *
--- End diff --

Let's see whether it still recognized as a ScalaDoc


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946822#comment-14946822
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1213#discussion_r41387526
  
--- Diff: 
flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 ---
@@ -103,18 +103,17 @@ class TestingCluster(
   instanceManager,
   scheduler,
   libraryCacheManager,
-  _,
   executionRetries,
   delayBetweenRetries,
   timeout,
   archiveCount,
-  leaderElectionService) = 
JobManager.createJobManagerComponents(config)
+  leaderElectionService) = JobManager.createJobManagerComponents(
+  config,
--- End diff --

Good catch


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946827#comment-14946827
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1213#discussion_r41387700
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java ---
@@ -0,0 +1,867 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+* All classes in this package contain code taken from
+* 
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+* and
+* https://github.com/hortonworks/simple-yarn-app
+* and
+* 
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+*
+* The Flink jar is uploaded to HDFS by this client.
+* The application master and all the TaskManager containers get the jar 
file downloaded
+* by YARN into their local fs.
+*
+*/
+public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnClient.class);
+
+   /**
+* Constants,
+* all starting with ENV_ are used as environment variables to pass 
values from the Client
+* to the Application Master.
+*/
+   public final static 

[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14946826#comment-14946826
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1213#discussion_r41387670
  
--- Diff: 
flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{Terminated, ActorRef}
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import 
org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID,
+RequestLeaderSessionID}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import 
org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
+AcknowledgeRegistration}
+import 
org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, 
TaskInFinalState}
+import org.apache.flink.runtime.taskmanager.TaskManager
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import 
org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
+CheckIfJobRemoved, Alive}
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
+
+import scala.concurrent.duration._
+
+import language.postfixOps
+
+/** This mixin can be used to decorate a TaskManager with messages for 
testing purposes.
+  *
--- End diff --

Fixed


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14947033#comment-14947033
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-146232148
  
Smooth experience now. Works as expected with vanilla YARN 2.6.0.

+1 to merge.


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14947059#comment-14947059
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-146239375
  
Thanks for the review @uce. I addressed your comments.

I fixed the problem with Hadoop 2.6.0 by relocating Flink's curator 
dependency to `org.apache.flink.shaded.org.apache.curator`. Furthermore, I 
bumped `flink-shaded-curator's` Guava version to Flink's Guava version so that 
we don't include too many different Guava versions in the resulting 
`flink-dist.jar`. 


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-06 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944675#comment-14944675
 ] 

Till Rohrmann commented on FLINK-2790:
--

What did the logs say?

I try to reproduce it.

On Mon, Oct 5, 2015 at 5:50 PM, ASF GitHub Bot (JIRA) 



> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943008#comment-14943008
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-145450115
  
I think TMs are only kept alive if their containers have been properly 
started. If the AM happens to die while the TM container are started up, I 
think they will be terminated as well. Another question is how did you kill the 
AM and what do you mean with "[...] restarting properly. But I think that's not 
the expected behavior"? 


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943014#comment-14943014
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-145451351
  
Looks like a lot of work to figure out the different version behaviours. 
Good job and thanks for the clear explanation. :)

I guess Robert meant with "not restarting properly" that the TMs were 
restarted as well.

How does the way you kill the AM affect recovery?

I will try this out later today.


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943029#comment-14943029
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-145456411
  
I was just curious whether he killed them gracefully with a `PoisonPill` or 
via killing the JVM process.


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943559#comment-14943559
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-145577205
  
I killed the AM using "kill" from the command line, and all the 
Taskmanagers were shutting down



> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14942870#comment-14942870
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-145404233
  
I've looked a bit over the code and it all looks like good changes.

I've tested the code on a Hadoop 2.6.0 cluster and the YARN session was 
restarting properly. But I think that's not the expected behavior.
We actually want the TMs to stay alive and reconnect to the new AM.


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14941081#comment-14941081
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1213

[FLINK-2790] [yarn] [ha] Adds high availability support for Yarn

Adds high availability support for Yarn by exploiting Yarn's functionality 
to restart a failed application master. Depending on the Hadoop version the 
behaviour is an increasing superset of functionalities of the preceding 
version's behaviour

###2.3.0 <= version < 2.4.0

* Set the number of application attempts to the configuration value 
`yarn.application-attempts`. This means that the application can be restarted 
`yarn.application-attempts` time before yarn fails the application. In case of 
an application master, all other task manager containers will also be killed.

### 2.4.0 <= version < 2.6.0

* Additionally, enables that containers will be kept across application 
attempts. This avoids the killing of TaskManager containers in the case of an 
application master failure. This has the advantage that the startup time is 
faster and that the user does not have to wait for obtaining the container 
resources again.

### 2.6.0 <= version

* Sets the attempts failure validity interval to the akka timeout value. 
The attempts failure validity interval says that an application is only killed 
after the system has seen the maximum number of application attempts during one 
interval. This avoids that a long lasting job will deplete it's application 
attempts.

This PR also refactors the different Yarn components to allow the start-up 
of testing actors within Yarn. Furthermore, the `JobManager` start up logic is 
slightly extended to allow code reuse in the `ApplicationMasterBase`.

The HA functionality is tested via the `YARNHighAvailabilityITCase` where 
an application master is multiple times killed. Each time it's checked that the 
single TaskManager successfully reconnects to the newly started 
`YarnJobManager`. In case of version `2.3.0`, the `TaskManager` is restarted.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink yarnHA

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1213.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1213


commit 1a18172ae69eb576638704f8e143a921aa8630d5
Author: Till Rohrmann 
Date:   2015-09-01T14:35:48Z

[FLINK-2790] [yarn] [ha] Adds high availability support for Yarn

commit 5359676556d16610303d4f36fcbe5320ef4e6643
Author: Till Rohrmann 
Date:   2015-09-23T15:42:57Z

Refactors JobManager's start actors method to be reusable

commit d6a47cd8ad265c5122d1a67c09773cbc5a491261
Author: Till Rohrmann 
Date:   2015-09-24T12:55:18Z

Yarn refactoring to introduce yarn testing functionality

commit f9578f136dd41cd9829d712f7c62a59c9ea8e194
Author: Till Rohrmann 
Date:   2015-09-28T16:21:30Z

Added support for testing yarn cluster. Extracted JobManager's and 
TaskManager's testing messages into stackable traits.

commit dbfa16438ad9d7d61e8d1a582c8cd1de9352078e
Author: Till Rohrmann 
Date:   2015-09-29T15:05:01Z

Implemented YarnHighAvailabilityITCase using Akka messages for 
synchronization.




> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)