[2/2] flink git commit: [FLINK-1629][FLINK-1630][FLINK-1547] Add option to start Flink on YARN in a detached mode. YARN container reallocation.

2015-03-12 Thread rmetzger
[FLINK-1629][FLINK-1630][FLINK-1547] Add option to start Flink on YARN in a 
detached mode. YARN container reallocation.

This commit is changing:
[FLINK-1629]: users can now fire and forget jobs to YARN or YARN sessions to 
there. (Detached mode)
[FLINK-1630]: YARN is now reallocating failed YARN containers during the 
lifetime of a YARN session.
[FLINK-1547]: Users can now specify if they want the ApplicationMaster (= the 
JobManager = the entire YARN session) to restart on failure, and how often. 
After the first restart, the session will behave like a detached session. There 
is now backup of state between the old and the new AM.

The whole resource negotiation process between the RM and the AM has been 
reworked.
Flink is now much more flexible when requesting new containers and also giving 
back uneeded containers.

A new test case is testing the container restart. It is also verifying that the 
web frontend is proplery started,
that the logfile access is possible and
that the configuration values the user specifies when starting the YARN session 
are visible in the web frontend.

This closes #468


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/13bb21b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/13bb21b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/13bb21b1

Branch: refs/heads/master
Commit: 13bb21b1bd83b4f9e434f735ba7517ffd03478d3
Parents: fd9ca4d
Author: Robert Metzger rmetz...@apache.org
Authored: Thu Mar 5 15:03:05 2015 +0100
Committer: Robert Metzger rmetz...@apache.org
Committed: Thu Mar 12 11:33:37 2015 +0100

--
 docs/yarn_setup.md  |  51 +-
 .../flink/client/FlinkYarnSessionCli.java   |  38 +-
 .../flink/configuration/ConfigConstants.java|  29 ++
 .../runtime/jobmanager/web/WebInfoServer.java   |  23 +-
 .../flink/runtime/util/SignalHandler.java   |  80 +++
 .../runtime/yarn/AbstractFlinkYarnClient.java   |   2 +
 .../runtime/yarn/AbstractFlinkYarnCluster.java  |   2 +
 .../runtime/taskmanager/TaskManagerTest.java|   1 -
 flink-yarn-tests/pom.xml|   6 +
 .../java/org/apache/flink/yarn/UtilsTest.java   |  51 ++
 .../YARNSessionCapacitySchedulerITCase.java |  16 +-
 .../flink/yarn/YARNSessionFIFOITCase.java   | 293 +--
 .../org/apache/flink/yarn/YarnTestBase.java | 141 +++--
 .../src/main/resources/log4j-test.properties|  15 +-
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  21 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java | 139 +++--
 .../yarn/appMaster/YarnTaskManagerRunner.java   |   1 +
 .../apache/flink/yarn/ApplicationClient.scala   |   9 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |  19 +-
 .../flink/yarn/ApplicationMasterActor.scala | 509 +++
 .../scala/org/apache/flink/yarn/Messages.scala  |   4 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  | 330 
 .../org/apache/flink/yarn/YarnTaskManager.scala |   7 +-
 tools/travis_mvn_watchdog.sh|  14 +
 24 files changed, 1291 insertions(+), 510 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/docs/yarn_setup.md
--
diff --git a/docs/yarn_setup.md b/docs/yarn_setup.md
index cbd6759..5c53f8b 100644
--- a/docs/yarn_setup.md
+++ b/docs/yarn_setup.md
@@ -23,19 +23,30 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## In a Nutshell
+## Quickstart: Start a long-running Flink cluster on YARN
 
-Start YARN session with 4 Task Managers (each with 4 GB of Heapspace):
+Start a YARN session with 4 Task Managers (each with 4 GB of Heapspace):
 
 ~~~bash
 wget {{ site.FLINK_WGET_URL_YARN_STABLE }}
-tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2-yarn.tgz
-cd flink-yarn-{{ site.FLINK_VERSION_SHORT }}/
+tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2.tgz
+cd flink-{{ site.FLINK_VERSION_SHORT }}/
 ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
 ~~~
 
 Specify the `-s` flag for the number of processing slots per Task Manager. We 
recommend to set the number of slots to the number of processors per machine.
 
+Once the session has been started, you can submit jobs to the cluster using 
the `./bin/flink` tool.
+
+## Quickstart: Run a Flink job on YARN
+
+~~~bash
+wget {{ site.FLINK_WGET_URL_YARN_STABLE }}
+tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2.tgz
+cd flink-{{ site.FLINK_VERSION_SHORT }}/
+./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
./examples/flink-java-examples-{{ site.FLINK_VERSION_SHORT }}-WordCount.jar
+~~~
+
 ## Apache Flink on Hadoop YARN using a YARN Session
 
 Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource 
management framework. It allows to run various distributed applications 

[1/2] flink git commit: [FLINK-1629][FLINK-1630][FLINK-1547] Add option to start Flink on YARN in a detached mode. YARN container reallocation.

2015-03-12 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/master fd9ca4def - 13bb21b1b


http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
--
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
new file mode 100644
index 000..6a6a6e4
--- /dev/null
+++ 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.util.Collections
+
+import akka.actor.ActorRef
+import org.apache.flink.configuration.ConfigConstants
+import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.messages.Messages.Acknowledge
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
+import org.apache.flink.yarn.Messages._
+import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.{NMClient, AMRMClient}
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.YarnException
+import org.apache.hadoop.yarn.util.Records
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Try
+
+
+trait ApplicationMasterActor extends ActorLogMessages {
+  that: JobManager =
+
+  import context._
+  import scala.collection.JavaConverters._
+
+  val FAST_YARN_HEARTBEAT_DELAY: FiniteDuration = 500 milliseconds
+  val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
+  val YARN_HEARTBEAT_DELAY: FiniteDuration =
+if(configuration.getString(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 
null) == null) {
+  DEFAULT_YARN_HEARTBEAT_DELAY
+} else {
+  FiniteDuration(
+configuration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 
5), SECONDS)
+}
+
+  var rmClientOption: Option[AMRMClient[ContainerRequest]] = None
+  var nmClientOption: Option[NMClient] = None
+  var messageListener:Option[ActorRef] = None
+  var containerLaunchContext: Option[ContainerLaunchContext] = None
+
+  var runningContainers = 0 // number of currently running containers
+  var failedContainers = 0 // failed container count
+  var numTaskManager = 0 // the requested number of TMs
+  var maxFailedContainers = 0
+  var containersLaunched = 0
+  var numPendingRequests = 0 // number of currently pending container 
allocation requests.
+
+  var memoryPerTaskManager = 0
+
+  // list of containers available for starting
+  var allocatedContainersList: mutable.MutableList[Container] = new 
mutable.MutableList[Container]
+  var runningContainersList: mutable.MutableList[Container] = new 
mutable.MutableList[Container]
+
+
+  abstract override def receiveWithLogMessages: Receive = {
+receiveYarnMessages orElse super.receiveWithLogMessages
+  }
+
+  def receiveYarnMessages: Receive = {
+case StopYarnSession(status, diag) =
+  log.info(Stopping YARN JobManager with status {} and diagnostic {}., 
status, diag)
+
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =
+  instance.getTaskManager ! StopYarnSession(status, diag)
+  }
+
+  rmClientOption foreach {
+rmClient =
+  Try(rmClient.unregisterApplicationMaster(status, diag, )).recover{
+case t: Throwable = log.error(t, Could not unregister the 
application master.)
+  }
+
+  Try(rmClient.close()).recover{
+case t:Throwable = 

flink git commit: Remove unused imports from RichMapPartitionFunction. Wrap way too long statements in NetworkBufferPool.

2015-03-12 Thread hsaputra
Repository: flink
Updated Branches:
  refs/heads/master 92e809d1b - 378b487c4


Remove unused imports from RichMapPartitionFunction. Wrap way too long 
statements in NetworkBufferPool.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/378b487c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/378b487c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/378b487c

Branch: refs/heads/master
Commit: 378b487c4372f5a4cd51e356361b027e66d6225f
Parents: 92e809d
Author: Henry Saputra henry.sapu...@gmail.com
Authored: Thu Mar 12 15:50:16 2015 -0700
Committer: Henry Saputra henry.sapu...@gmail.com
Committed: Thu Mar 12 15:50:16 2015 -0700

--
 .../flink/api/common/functions/RichMapPartitionFunction.java   | 3 ---
 .../flink/runtime/io/network/buffer/NetworkBufferPool.java | 6 --
 2 files changed, 4 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/378b487c/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
index 4c288ca..d8b7e9b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.api.common.functions;
 
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.util.Collector;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/378b487c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 8de7c36..3a6dbf5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -74,12 +74,14 @@ public class NetworkBufferPool implements BufferPoolFactory 
{
int allocatedMb = ((availableMemorySegments.size()) * 
segmentSize)  20;
int missingMb = requiredMb - allocatedMb;
 
-   throw new OutOfMemoryError(Could not allocate enough 
memory segments for GlobalBufferPool (required (Mb):  + requiredMb + , 
allocated (Mb):  + allocatedMb + , missing (Mb):  + missingMb + ).);
+   throw new OutOfMemoryError(Could not allocate enough 
memory segments for GlobalBufferPool (required (Mb):  +
+   requiredMb + , allocated (Mb):  + 
allocatedMb + , missing (Mb):  + missingMb + ).);
}
 
int allocatedMb = ((availableMemorySegments.size()) * 
segmentSize)  20;
 
-   LOG.info(Allocated {} MB for network buffer pool (number of 
memory segments: {}, bytes per segment: {})., allocatedMb, 
availableMemorySegments.size(), segmentSize);
+   LOG.info(Allocated {} MB for network buffer pool (number of 
memory segments: {}, bytes per segment: {}).,
+   allocatedMb, availableMemorySegments.size(), 
segmentSize);
}
 
public MemorySegment requestMemorySegment() {