[2/2] flink git commit: [FLINK-1629][FLINK-1630][FLINK-1547] Add option to start Flink on YARN in a detached mode. YARN container reallocation.
[FLINK-1629][FLINK-1630][FLINK-1547] Add option to start Flink on YARN in a detached mode. YARN container reallocation. This commit is changing: [FLINK-1629]: users can now fire and forget jobs to YARN or YARN sessions to there. (Detached mode) [FLINK-1630]: YARN is now reallocating failed YARN containers during the lifetime of a YARN session. [FLINK-1547]: Users can now specify if they want the ApplicationMaster (= the JobManager = the entire YARN session) to restart on failure, and how often. After the first restart, the session will behave like a detached session. There is now backup of state between the old and the new AM. The whole resource negotiation process between the RM and the AM has been reworked. Flink is now much more flexible when requesting new containers and also giving back uneeded containers. A new test case is testing the container restart. It is also verifying that the web frontend is proplery started, that the logfile access is possible and that the configuration values the user specifies when starting the YARN session are visible in the web frontend. This closes #468 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/13bb21b1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/13bb21b1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/13bb21b1 Branch: refs/heads/master Commit: 13bb21b1bd83b4f9e434f735ba7517ffd03478d3 Parents: fd9ca4d Author: Robert Metzger rmetz...@apache.org Authored: Thu Mar 5 15:03:05 2015 +0100 Committer: Robert Metzger rmetz...@apache.org Committed: Thu Mar 12 11:33:37 2015 +0100 -- docs/yarn_setup.md | 51 +- .../flink/client/FlinkYarnSessionCli.java | 38 +- .../flink/configuration/ConfigConstants.java| 29 ++ .../runtime/jobmanager/web/WebInfoServer.java | 23 +- .../flink/runtime/util/SignalHandler.java | 80 +++ .../runtime/yarn/AbstractFlinkYarnClient.java | 2 + .../runtime/yarn/AbstractFlinkYarnCluster.java | 2 + .../runtime/taskmanager/TaskManagerTest.java| 1 - flink-yarn-tests/pom.xml| 6 + .../java/org/apache/flink/yarn/UtilsTest.java | 51 ++ .../YARNSessionCapacitySchedulerITCase.java | 16 +- .../flink/yarn/YARNSessionFIFOITCase.java | 293 +-- .../org/apache/flink/yarn/YarnTestBase.java | 141 +++-- .../src/main/resources/log4j-test.properties| 15 +- .../org/apache/flink/yarn/FlinkYarnClient.java | 21 +- .../org/apache/flink/yarn/FlinkYarnCluster.java | 139 +++-- .../yarn/appMaster/YarnTaskManagerRunner.java | 1 + .../apache/flink/yarn/ApplicationClient.scala | 9 +- .../apache/flink/yarn/ApplicationMaster.scala | 19 +- .../flink/yarn/ApplicationMasterActor.scala | 509 +++ .../scala/org/apache/flink/yarn/Messages.scala | 4 +- .../org/apache/flink/yarn/YarnJobManager.scala | 330 .../org/apache/flink/yarn/YarnTaskManager.scala | 7 +- tools/travis_mvn_watchdog.sh| 14 + 24 files changed, 1291 insertions(+), 510 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/docs/yarn_setup.md -- diff --git a/docs/yarn_setup.md b/docs/yarn_setup.md index cbd6759..5c53f8b 100644 --- a/docs/yarn_setup.md +++ b/docs/yarn_setup.md @@ -23,19 +23,30 @@ under the License. * This will be replaced by the TOC {:toc} -## In a Nutshell +## Quickstart: Start a long-running Flink cluster on YARN -Start YARN session with 4 Task Managers (each with 4 GB of Heapspace): +Start a YARN session with 4 Task Managers (each with 4 GB of Heapspace): ~~~bash wget {{ site.FLINK_WGET_URL_YARN_STABLE }} -tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2-yarn.tgz -cd flink-yarn-{{ site.FLINK_VERSION_SHORT }}/ +tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2.tgz +cd flink-{{ site.FLINK_VERSION_SHORT }}/ ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096 ~~~ Specify the `-s` flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine. +Once the session has been started, you can submit jobs to the cluster using the `./bin/flink` tool. + +## Quickstart: Run a Flink job on YARN + +~~~bash +wget {{ site.FLINK_WGET_URL_YARN_STABLE }} +tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2.tgz +cd flink-{{ site.FLINK_VERSION_SHORT }}/ +./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-{{ site.FLINK_VERSION_SHORT }}-WordCount.jar +~~~ + ## Apache Flink on Hadoop YARN using a YARN Session Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource management framework. It allows to run various distributed applications
[1/2] flink git commit: [FLINK-1629][FLINK-1630][FLINK-1547] Add option to start Flink on YARN in a detached mode. YARN container reallocation.
Repository: flink Updated Branches: refs/heads/master fd9ca4def - 13bb21b1b http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala -- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala new file mode 100644 index 000..6a6a6e4 --- /dev/null +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn + +import java.io.File +import java.nio.ByteBuffer +import java.util.Collections + +import akka.actor.ActorRef +import org.apache.flink.configuration.ConfigConstants +import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.jobmanager.JobManager +import org.apache.flink.runtime.messages.Messages.Acknowledge +import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus +import org.apache.flink.yarn.Messages._ +import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.ApplicationConstants +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.{NMClient, AMRMClient} +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.YarnException +import org.apache.hadoop.yarn.util.Records + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.Try + + +trait ApplicationMasterActor extends ActorLogMessages { + that: JobManager = + + import context._ + import scala.collection.JavaConverters._ + + val FAST_YARN_HEARTBEAT_DELAY: FiniteDuration = 500 milliseconds + val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds + val YARN_HEARTBEAT_DELAY: FiniteDuration = +if(configuration.getString(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, null) == null) { + DEFAULT_YARN_HEARTBEAT_DELAY +} else { + FiniteDuration( +configuration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5), SECONDS) +} + + var rmClientOption: Option[AMRMClient[ContainerRequest]] = None + var nmClientOption: Option[NMClient] = None + var messageListener:Option[ActorRef] = None + var containerLaunchContext: Option[ContainerLaunchContext] = None + + var runningContainers = 0 // number of currently running containers + var failedContainers = 0 // failed container count + var numTaskManager = 0 // the requested number of TMs + var maxFailedContainers = 0 + var containersLaunched = 0 + var numPendingRequests = 0 // number of currently pending container allocation requests. + + var memoryPerTaskManager = 0 + + // list of containers available for starting + var allocatedContainersList: mutable.MutableList[Container] = new mutable.MutableList[Container] + var runningContainersList: mutable.MutableList[Container] = new mutable.MutableList[Container] + + + abstract override def receiveWithLogMessages: Receive = { +receiveYarnMessages orElse super.receiveWithLogMessages + } + + def receiveYarnMessages: Receive = { +case StopYarnSession(status, diag) = + log.info(Stopping YARN JobManager with status {} and diagnostic {}., status, diag) + + instanceManager.getAllRegisteredInstances.asScala foreach { +instance = + instance.getTaskManager ! StopYarnSession(status, diag) + } + + rmClientOption foreach { +rmClient = + Try(rmClient.unregisterApplicationMaster(status, diag, )).recover{ +case t: Throwable = log.error(t, Could not unregister the application master.) + } + + Try(rmClient.close()).recover{ +case t:Throwable =
flink git commit: Remove unused imports from RichMapPartitionFunction. Wrap way too long statements in NetworkBufferPool.
Repository: flink Updated Branches: refs/heads/master 92e809d1b - 378b487c4 Remove unused imports from RichMapPartitionFunction. Wrap way too long statements in NetworkBufferPool. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/378b487c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/378b487c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/378b487c Branch: refs/heads/master Commit: 378b487c4372f5a4cd51e356361b027e66d6225f Parents: 92e809d Author: Henry Saputra henry.sapu...@gmail.com Authored: Thu Mar 12 15:50:16 2015 -0700 Committer: Henry Saputra henry.sapu...@gmail.com Committed: Thu Mar 12 15:50:16 2015 -0700 -- .../flink/api/common/functions/RichMapPartitionFunction.java | 3 --- .../flink/runtime/io/network/buffer/NetworkBufferPool.java | 6 -- 2 files changed, 4 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/378b487c/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java index 4c288ca..d8b7e9b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java @@ -18,9 +18,6 @@ package org.apache.flink.api.common.functions; -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.common.functions.MapPartitionFunction; -import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.util.Collector; /** http://git-wip-us.apache.org/repos/asf/flink/blob/378b487c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index 8de7c36..3a6dbf5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -74,12 +74,14 @@ public class NetworkBufferPool implements BufferPoolFactory { int allocatedMb = ((availableMemorySegments.size()) * segmentSize) 20; int missingMb = requiredMb - allocatedMb; - throw new OutOfMemoryError(Could not allocate enough memory segments for GlobalBufferPool (required (Mb): + requiredMb + , allocated (Mb): + allocatedMb + , missing (Mb): + missingMb + ).); + throw new OutOfMemoryError(Could not allocate enough memory segments for GlobalBufferPool (required (Mb): + + requiredMb + , allocated (Mb): + allocatedMb + , missing (Mb): + missingMb + ).); } int allocatedMb = ((availableMemorySegments.size()) * segmentSize) 20; - LOG.info(Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {})., allocatedMb, availableMemorySegments.size(), segmentSize); + LOG.info(Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {})., + allocatedMb, availableMemorySegments.size(), segmentSize); } public MemorySegment requestMemorySegment() {