http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
new file mode 100644
index 0000000..4871b96
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -0,0 +1,827 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.ExecutionContexts$;
+import akka.util.Timeout;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.HybridMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.NetUtils;
+
+import scala.Tuple2;
+import scala.Option;
+import scala.Some;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * TaskExecutor implementation. The task executor is responsible for the 
execution of multiple
+ * {@link org.apache.flink.runtime.taskmanager.Task}.
+ */
+public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+       /** The unique resource ID of this TaskExecutor */
+       private final ResourceID resourceID;
+
+       /** The access to the leader election and metadata storage services */
+       private final HighAvailabilityServices haServices;
+
+       /** The task manager configuration */
+       private final TaskExecutorConfiguration taskExecutorConfig;
+
+       /** The I/O manager component in the task manager */
+       private final IOManager ioManager;
+
+       /** The memory manager component in the task manager */
+       private final MemoryManager memoryManager;
+
+       /** The network component in the task manager */
+       private final NetworkEnvironment networkEnvironment;
+
+       /** The number of slots in the task manager, should be 1 for YARN */
+       private final int numberOfSlots;
+
+       // --------- resource manager --------
+
+       private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
+
+       // 
------------------------------------------------------------------------
+
+       public TaskExecutor(
+                       TaskExecutorConfiguration taskExecutorConfig,
+                       ResourceID resourceID,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       NetworkEnvironment networkEnvironment,
+                       int numberOfSlots,
+                       RpcService rpcService,
+                       HighAvailabilityServices haServices) {
+
+               super(rpcService);
+
+               this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
+               this.resourceID = checkNotNull(resourceID);
+               this.memoryManager = checkNotNull(memoryManager);
+               this.ioManager = checkNotNull(ioManager);
+               this.networkEnvironment = checkNotNull(networkEnvironment);
+               this.numberOfSlots = checkNotNull(numberOfSlots);
+               this.haServices = checkNotNull(haServices);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Life cycle
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void start() {
+               super.start();
+
+               // start by connecting to the ResourceManager
+               try {
+                       
haServices.getResourceManagerLeaderRetriever().start(new 
ResourceManagerLeaderListener());
+               } catch (Exception e) {
+                       onFatalErrorAsync(e);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  RPC methods - ResourceManager related
+       // 
------------------------------------------------------------------------
+
+       @RpcMethod
+       public void notifyOfNewResourceManagerLeader(String newLeaderAddress, 
UUID newLeaderId) {
+               if (resourceManagerConnection != null) {
+                       if (newLeaderAddress != null) {
+                               // the resource manager switched to a new leader
+                               log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
+                                       
resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
+                       }
+                       else {
+                               // address null means that the current leader 
is lost without a new leader being there, yet
+                               log.info("Current ResourceManager {} lost 
leader status. Waiting for new ResourceManager leader.",
+                                       
resourceManagerConnection.getResourceManagerAddress());
+                       }
+
+                       // drop the current connection or connection attempt
+                       if (resourceManagerConnection != null) {
+                               resourceManagerConnection.close();
+                               resourceManagerConnection = null;
+                       }
+               }
+
+               // establish a connection to the new leader
+               if (newLeaderAddress != null) {
+                       log.info("Attempting to register at ResourceManager 
{}", newLeaderAddress);
+                       resourceManagerConnection =
+                               new 
TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, 
newLeaderId);
+                       resourceManagerConnection.start();
+               }
+       }
+
+       /**
+        * Starts and runs the TaskManager.
+        * <p/>
+        * This method first tries to select the network interface to use for 
the TaskManager
+        * communication. The network interface is used both for the actor 
communication
+        * (coordination) as well as for the data exchange between task 
managers. Unless
+        * the hostname/interface is explicitly configured in the 
configuration, this
+        * method will try out various interfaces and methods to connect to the 
JobManager
+        * and select the one where the connection attempt is successful.
+        * <p/>
+        * After selecting the network interface, this method brings up an 
actor system
+        * for the TaskManager and its actors, starts the TaskManager's services
+        * (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+        *
+        * @param configuration    The configuration for the TaskManager.
+        * @param resourceID       The id of the resource which the task 
manager will run on.
+        */
+       public static void selectNetworkInterfaceAndRunTaskManager(
+               Configuration configuration,
+               ResourceID resourceID) throws Exception {
+
+               final InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+               runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+       }
+
+       private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+               throws Exception {
+               String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+               if (taskManagerHostname != null) {
+                       LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
+               } else {
+                       LeaderRetrievalService leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+                       FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
+
+                       InetAddress taskManagerAddress = 
LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, 
lookupTimeout);
+                       taskManagerHostname = taskManagerAddress.getHostName();
+                       LOG.info("TaskManager will use hostname/address '{}' 
({}) for communication.",
+                               taskManagerHostname, 
taskManagerAddress.getHostAddress());
+               }
+
+               // if no task manager port has been configured, use 0 (system 
will pick any free port)
+               final int actorSystemPort = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+               if (actorSystemPort < 0 || actorSystemPort > 65535) {
+                       throw new IllegalConfigurationException("Invalid value 
for '" +
+                               ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
+                               "' (port for the TaskManager actor system) : " 
+ actorSystemPort +
+                               " - Leave config parameter empty or use 0 to 
let the system choose a port automatically.");
+               }
+
+               return new InetSocketAddress(taskManagerHostname, 
actorSystemPort);
+       }
+
+       /**
+        * Starts and runs the TaskManager. Brings up an actor system for the 
TaskManager and its
+        * actors, starts the TaskManager's services (library cache, shuffle 
network stack, ...),
+        * and starts the TaskManager itself.
+        * <p/>
+        * This method will also spawn a process reaper for the TaskManager 
(kill the process if
+        * the actor fails) and optionally start the JVM memory logging thread.
+        *
+        * @param taskManagerHostname The hostname/address of the interface 
where the actor system
+        *                            will communicate.
+        * @param resourceID          The id of the resource which the task 
manager will run on.
+        * @param actorSystemPort   The port at which the actor system will 
communicate.
+        * @param configuration       The configuration for the TaskManager.
+        */
+       private static void runTaskManager(
+               String taskManagerHostname,
+               ResourceID resourceID,
+               int actorSystemPort,
+               final Configuration configuration) throws Exception {
+
+               LOG.info("Starting TaskManager");
+
+               // Bring up the TaskManager actor system first, bind it to the 
given address.
+
+               LOG.info("Starting TaskManager actor system at " +
+                       NetUtils.hostAndPortToUrlString(taskManagerHostname, 
actorSystemPort));
+
+               final ActorSystem taskManagerSystem;
+               try {
+                       Tuple2<String, Object> address = new Tuple2<String, 
Object>(taskManagerHostname, actorSystemPort);
+                       Config akkaConfig = 
AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
+                       LOG.debug("Using akka configuration\n " + akkaConfig);
+                       taskManagerSystem = 
AkkaUtils.createActorSystem(akkaConfig);
+               } catch (Throwable t) {
+                       if (t instanceof 
org.jboss.netty.channel.ChannelException) {
+                               Throwable cause = t.getCause();
+                               if (cause != null && t.getCause() instanceof 
java.net.BindException) {
+                                       String address = 
NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
+                                       throw new IOException("Unable to bind 
TaskManager actor system to address " +
+                                               address + " - " + 
cause.getMessage(), t);
+                               }
+                       }
+                       throw new Exception("Could not create TaskManager actor 
system", t);
+               }
+
+               // start akka rpc service based on actor system
+               final Timeout timeout = new 
Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS);
+               final AkkaRpcService akkaRpcService = new 
AkkaRpcService(taskManagerSystem, timeout);
+
+               // start high availability service to implement 
getResourceManagerLeaderRetriever method only
+               final HighAvailabilityServices haServices = new 
HighAvailabilityServices() {
+                       @Override
+                       public LeaderRetrievalService 
getResourceManagerLeaderRetriever() throws Exception {
+                               return 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+                       }
+
+                       @Override
+                       public LeaderElectionService 
getResourceManagerLeaderElectionService() throws Exception {
+                               return null;
+                       }
+
+                       @Override
+                       public LeaderElectionService 
getJobMasterLeaderElectionService(JobID jobID) throws Exception {
+                               return null;
+                       }
+               };
+
+               // start all the TaskManager services (network stack,  library 
cache, ...)
+               // and the TaskManager actor
+               try {
+                       LOG.info("Starting TaskManager actor");
+                       TaskExecutor taskExecutor = 
startTaskManagerComponentsAndActor(
+                               configuration,
+                               resourceID,
+                               akkaRpcService,
+                               taskManagerHostname,
+                               haServices,
+                               false);
+
+                       taskExecutor.start();
+
+                       // if desired, start the logging daemon that 
periodically logs the memory usage information
+                       if (LOG.isInfoEnabled() && configuration.getBoolean(
+                               
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
+                               
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
+                               LOG.info("Starting periodic memory usage 
logger");
+
+                               long interval = configuration.getLong(
+                                       
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
+                                       
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
+
+                               MemoryLogger logger = new MemoryLogger(LOG, 
interval, taskManagerSystem);
+                               logger.start();
+                       }
+
+                       // block until everything is done
+                       taskManagerSystem.awaitTermination();
+               } catch (Throwable t) {
+                       LOG.error("Error while starting up taskManager", t);
+                       try {
+                               taskManagerSystem.shutdown();
+                       } catch (Throwable tt) {
+                               LOG.warn("Could not cleanly shut down actor 
system", tt);
+                       }
+                       throw t;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------
+       //  Starting and running the TaskManager
+       // 
--------------------------------------------------------------------------
+
+       /**
+        * @param configuration                 The configuration for the 
TaskManager.
+        * @param resourceID                    The id of the resource which 
the task manager will run on.
+        * @param rpcService                  The rpc service which is used to 
start and connect to the TaskManager RpcEndpoint .
+        * @param taskManagerHostname       The hostname/address that describes 
the TaskManager's data location.
+        * @param haServices        Optionally, a high availability service can 
be provided. If none is given,
+        *                                      then a HighAvailabilityServices 
is constructed from the configuration.
+        * @param localTaskManagerCommunication     If true, the TaskManager 
will not initiate the TCP network stack.
+        * @return An ActorRef to the TaskManager actor.
+        * @throws org.apache.flink.configuration.IllegalConfigurationException 
    Thrown, if the given config contains illegal values.
+        * @throws java.io.IOException      Thrown, if any of the I/O 
components (such as buffer pools,
+        *                                       I/O manager, ...) cannot be 
properly started.
+        * @throws java.lang.Exception      Thrown is some other error occurs 
while parsing the configuration
+        *                                      or starting the TaskManager 
components.
+        */
+       public static TaskExecutor startTaskManagerComponentsAndActor(
+               Configuration configuration,
+               ResourceID resourceID,
+               RpcService rpcService,
+               String taskManagerHostname,
+               HighAvailabilityServices haServices,
+               boolean localTaskManagerCommunication) throws Exception {
+
+               final TaskExecutorConfiguration taskExecutorConfig = 
parseTaskManagerConfiguration(
+                       configuration, taskManagerHostname, 
localTaskManagerCommunication);
+
+               MemoryType memType = 
taskExecutorConfig.getNetworkConfig().memoryType();
+
+               // pre-start checks
+               checkTempDirs(taskExecutorConfig.getTmpDirPaths());
+
+               ExecutionContext executionContext = 
ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool());
+
+               // we start the network first, to make sure it can allocate its 
buffers first
+               final NetworkEnvironment network = new NetworkEnvironment(
+                       executionContext,
+                       taskExecutorConfig.getTimeout(),
+                       taskExecutorConfig.getNetworkConfig(),
+                       taskExecutorConfig.getConnectionInfo());
+
+               // computing the amount of memory to use depends on how much 
memory is available
+               // it strictly needs to happen AFTER the network stack has been 
initialized
+
+               // check if a value has been configured
+               long configuredMemory = 
configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+               checkConfigParameter(configuredMemory == -1 || configuredMemory 
> 0, configuredMemory,
+                       ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+                       "MemoryManager needs at least one MB of memory. " +
+                               "If you leave this config parameter empty, the 
system automatically " +
+                               "pick a fraction of the available memory.");
+
+               final long memorySize;
+               boolean preAllocateMemory = configuration.getBoolean(
+                       ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+                       
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+               if (configuredMemory > 0) {
+                       if (preAllocateMemory) {
+                               LOG.info("Using {} MB for managed memory." , 
configuredMemory);
+                       } else {
+                               LOG.info("Limiting managed memory to {} MB, 
memory will be allocated lazily." , configuredMemory);
+                       }
+                       memorySize = configuredMemory << 20; // megabytes to 
bytes
+               } else {
+                       float fraction = configuration.getFloat(
+                               
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+                               
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+                       checkConfigParameter(fraction > 0.0f && fraction < 
1.0f, fraction,
+                               
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+                               "MemoryManager fraction of the free memory must 
be between 0.0 and 1.0");
+
+                       if (memType == MemoryType.HEAP) {
+                               long relativeMemSize = (long) 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
+                               if (preAllocateMemory) {
+                                       LOG.info("Using {} of the currently 
free heap space for managed heap memory ({} MB)." ,
+                                               fraction , relativeMemSize >> 
20);
+                               } else {
+                                       LOG.info("Limiting managed memory to {} 
of the currently free heap space ({} MB), " +
+                                               "memory will be allocated 
lazily." , fraction , relativeMemSize >> 20);
+                               }
+                               memorySize = relativeMemSize;
+                       } else if (memType == MemoryType.OFF_HEAP) {
+                               // The maximum heap memory has been adjusted 
according to the fraction
+                               long maxMemory = 
EnvironmentInformation.getMaxJvmHeapMemory();
+                               long directMemorySize = (long) (maxMemory / 
(1.0 - fraction) * fraction);
+                               if (preAllocateMemory) {
+                                       LOG.info("Using {} of the maximum 
memory size for managed off-heap memory ({} MB)." ,
+                                               fraction, directMemorySize >> 
20);
+                               } else {
+                                       LOG.info("Limiting managed memory to {} 
of the maximum memory size ({} MB)," +
+                                               " memory will be allocated 
lazily.", fraction, directMemorySize >> 20);
+                               }
+                               memorySize = directMemorySize;
+                       } else {
+                               throw new RuntimeException("No supported memory 
type detected.");
+                       }
+               }
+
+               // now start the memory manager
+               final MemoryManager memoryManager;
+               try {
+                       memoryManager = new MemoryManager(
+                               memorySize,
+                               taskExecutorConfig.getNumberOfSlots(),
+                               
taskExecutorConfig.getNetworkConfig().networkBufferSize(),
+                               memType,
+                               preAllocateMemory);
+               } catch (OutOfMemoryError e) {
+                       if (memType == MemoryType.HEAP) {
+                               throw new Exception("OutOfMemory error (" + 
e.getMessage() +
+                                       ") while allocating the TaskManager 
heap memory (" + memorySize + " bytes).", e);
+                       } else if (memType == MemoryType.OFF_HEAP) {
+                               throw new Exception("OutOfMemory error (" + 
e.getMessage() +
+                                       ") while allocating the TaskManager 
off-heap memory (" + memorySize +
+                                       " bytes).Try increasing the maximum 
direct memory (-XX:MaxDirectMemorySize)", e);
+                       } else {
+                               throw e;
+                       }
+               }
+
+               // start the I/O manager, it will create some temp directories.
+               final IOManager ioManager = new 
IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
+
+               final TaskExecutor taskExecutor = new TaskExecutor(
+                       taskExecutorConfig,
+                       resourceID,
+                       memoryManager,
+                       ioManager,
+                       network,
+                       taskExecutorConfig.getNumberOfSlots(),
+                       rpcService,
+                       haServices);
+
+               return taskExecutor;
+       }
+
+       // 
--------------------------------------------------------------------------
+       //  Parsing and checking the TaskManager Configuration
+       // 
--------------------------------------------------------------------------
+
+       /**
+        * Utility method to extract TaskManager config parameters from the 
configuration and to
+        * sanity check them.
+        *
+        * @param configuration                 The configuration.
+        * @param taskManagerHostname           The host name under which the 
TaskManager communicates.
+        * @param localTaskManagerCommunication             True, to skip 
initializing the network stack.
+        *                                      Use only in cases where only 
one task manager runs.
+        * @return TaskExecutorConfiguration that wrappers 
InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
+        */
+       private static TaskExecutorConfiguration parseTaskManagerConfiguration(
+               Configuration configuration,
+               String taskManagerHostname,
+               boolean localTaskManagerCommunication) throws Exception {
+
+               // ------- read values from the config and check them ---------
+               //                      (a lot of them)
+
+               // ----> hosts / ports for communication and data exchange
+
+               int dataport = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+                       ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+               if (dataport == 0) {
+                       dataport = NetUtils.getAvailablePort();
+               }
+               checkConfigParameter(dataport > 0, dataport, 
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+                       "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+               InetAddress taskManagerAddress = 
InetAddress.getByName(taskManagerHostname);
+               final InstanceConnectionInfo connectionInfo = new 
InstanceConnectionInfo(taskManagerAddress, dataport);
+
+               // ----> memory / network stack (shuffles/broadcasts), task 
slots, temp directories
+
+               // we need this because many configs have been written with a 
"-1" entry
+               int slots = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+               if (slots == -1) {
+                       slots = 1;
+               }
+               checkConfigParameter(slots >= 1, slots, 
ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+                       "Number of task slots must be at least one.");
+
+               final int numNetworkBuffers = configuration.getInteger(
+                       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+                       
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+               checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+                       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 
"");
+
+               final int pageSize = configuration.getInteger(
+                       ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+                       
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+               // check page size of for minimum size
+               checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, 
pageSize,
+                       ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+                       "Minimum memory segment size is " + 
MemoryManager.MIN_PAGE_SIZE);
+               // check page size for power of two
+               checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
+                       ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+                       "Memory segment size must be a power of 2.");
+
+               // check whether we use heap or off-heap memory
+               final MemoryType memType;
+               if 
(configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, 
false)) {
+                       memType = MemoryType.OFF_HEAP;
+               } else {
+                       memType = MemoryType.HEAP;
+               }
+
+               // initialize the memory segment factory accordingly
+               if (memType == MemoryType.HEAP) {
+                       if 
(!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
+                               throw new Exception("Memory type is set to heap 
memory, but memory segment " +
+                                       "factory has been initialized for 
off-heap memory segments");
+                       }
+               } else {
+                       if 
(!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) 
{
+                               throw new Exception("Memory type is set to 
off-heap memory, but memory segment " +
+                                       "factory has been initialized for heap 
memory segments");
+                       }
+               }
+
+               final String[] tmpDirs = configuration.getString(
+                       ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+                       
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+               final NettyConfig nettyConfig;
+               if (!localTaskManagerCommunication) {
+                       nettyConfig = new NettyConfig(connectionInfo.address(), 
connectionInfo.dataPort(), pageSize, slots, configuration);
+               } else {
+                       nettyConfig = null;
+               }
+
+               // Default spill I/O mode for intermediate results
+               final String syncOrAsync = configuration.getString(
+                       ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+                       
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
+
+               final IOMode ioMode;
+               if (syncOrAsync.equals("async")) {
+                       ioMode = IOManager.IOMode.ASYNC;
+               } else {
+                       ioMode = IOManager.IOMode.SYNC;
+               }
+
+               final int queryServerPort =  configuration.getInteger(
+                       ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
+                       ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
+
+               final int queryServerNetworkThreads =  configuration.getInteger(
+                       ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
+                       
ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
+
+               final int queryServerQueryThreads =  configuration.getInteger(
+                       ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
+                       
ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
+
+               final NetworkEnvironmentConfiguration networkConfig = new 
NetworkEnvironmentConfiguration(
+                       numNetworkBuffers,
+                       pageSize,
+                       memType,
+                       ioMode,
+                       queryServerPort,
+                       queryServerNetworkThreads,
+                       queryServerQueryThreads,
+                       localTaskManagerCommunication ? 
Option.<NettyConfig>empty() : new Some<>(nettyConfig),
+                       new Tuple2<>(500, 3000));
+
+               // ----> timeouts, library caching, profiling
+
+               final FiniteDuration timeout;
+               try {
+                       timeout = AkkaUtils.getTimeout(configuration);
+               } catch (Exception e) {
+                       throw new IllegalArgumentException(
+                               "Invalid format for '" + 
ConfigConstants.AKKA_ASK_TIMEOUT +
+                                       "'.Use formats like '50 s' or '1 min' 
to specify the timeout.");
+               }
+               LOG.info("Messages between TaskManager and JobManager have a 
max timeout of " + timeout);
+
+               final long cleanupInterval = configuration.getLong(
+                       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+                       
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+
+               final FiniteDuration finiteRegistrationDuration;
+               try {
+                       Duration maxRegistrationDuration = 
Duration.create(configuration.getString(
+                               
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+                               
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
+                       if (maxRegistrationDuration.isFinite()) {
+                               finiteRegistrationDuration = new 
FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS);
+                       } else {
+                               finiteRegistrationDuration = null;
+                       }
+               } catch (NumberFormatException e) {
+                       throw new IllegalArgumentException("Invalid format for 
parameter " +
+                               
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
+               }
+
+               final FiniteDuration initialRegistrationPause;
+               try {
+                       Duration pause = 
Duration.create(configuration.getString(
+                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+                               
ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
+                       if (pause.isFinite()) {
+                               initialRegistrationPause = new 
FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+                       } else {
+                               throw new IllegalArgumentException("The initial 
registration pause must be finite: " + pause);
+                       }
+               } catch (NumberFormatException e) {
+                       throw new IllegalArgumentException("Invalid format for 
parameter " +
+                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+               }
+
+               final FiniteDuration maxRegistrationPause;
+               try {
+                       Duration pause = 
Duration.create(configuration.getString(
+                               
ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
+                               
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
+                       if (pause.isFinite()) {
+                               maxRegistrationPause = new 
FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+                       } else {
+                               throw new IllegalArgumentException("The maximum 
registration pause must be finite: " + pause);
+                       }
+               } catch (NumberFormatException e) {
+                       throw new IllegalArgumentException("Invalid format for 
parameter " +
+                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+               }
+
+               final FiniteDuration refusedRegistrationPause;
+               try {
+                       Duration pause = 
Duration.create(configuration.getString(
+                               
ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
+                               
ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
+                       if (pause.isFinite()) {
+                               refusedRegistrationPause = new 
FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+                       } else {
+                               throw new IllegalArgumentException("The refused 
registration pause must be finite: " + pause);
+                       }
+               } catch (NumberFormatException e) {
+                       throw new IllegalArgumentException("Invalid format for 
parameter " +
+                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+               }
+
+               return new TaskExecutorConfiguration(
+                       tmpDirs,
+                       cleanupInterval,
+                       connectionInfo,
+                       networkConfig,
+                       timeout,
+                       finiteRegistrationDuration,
+                       slots,
+                       configuration,
+                       initialRegistrationPause,
+                       maxRegistrationPause,
+                       refusedRegistrationPause);
+       }
+
+       /**
+        * Validates a condition for a config parameter and displays a standard 
exception, if the
+        * the condition does not hold.
+        *
+        * @param condition    The condition that must hold. If the condition 
is false, an exception is thrown.
+        * @param parameter    The parameter value. Will be shown in the 
exception message.
+        * @param name         The name of the config parameter. Will be shown 
in the exception message.
+        * @param errorMessage The optional custom error message to append to 
the exception message.
+        */
+       private static void checkConfigParameter(
+               boolean condition,
+               Object parameter,
+               String name,
+               String errorMessage) {
+               if (!condition) {
+                       throw new IllegalConfigurationException("Invalid 
configuration value for " + name + " : " + parameter + " - " + errorMessage);
+               }
+       }
+
+       /**
+        * Validates that all the directories denoted by the strings do 
actually exist, are proper
+        * directories (not files), and are writable.
+        *
+        * @param tmpDirs The array of directory paths to check.
+        * @throws Exception Thrown if any of the directories does not exist or 
is not writable
+        *                   or is a file, rather than a directory.
+        */
+       private static void checkTempDirs(String[] tmpDirs) throws IOException {
+               for (String dir : tmpDirs) {
+                       if (dir != null && !dir.equals("")) {
+                               File file = new File(dir);
+                               if (!file.exists()) {
+                                       throw new IOException("Temporary file 
directory " + file.getAbsolutePath() + " does not exist.");
+                               }
+                               if (!file.isDirectory()) {
+                                       throw new IOException("Temporary file 
directory " + file.getAbsolutePath() + " is not a directory.");
+                               }
+                               if (!file.canWrite()) {
+                                       throw new IOException("Temporary file 
directory " + file.getAbsolutePath() + " is not writable.");
+                               }
+
+                               if (LOG.isInfoEnabled()) {
+                                       long totalSpaceGb = 
file.getTotalSpace() >> 30;
+                                       long usableSpaceGb = 
file.getUsableSpace() >> 30;
+                                       double usablePercentage = 
(double)usableSpaceGb / totalSpaceGb * 100;
+                                       String path = file.getAbsolutePath();
+                                       LOG.info(String.format("Temporary file 
directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
+                                               path, totalSpaceGb, 
usableSpaceGb, usablePercentage));
+                               }
+                       } else {
+                               throw new IllegalArgumentException("Temporary 
file directory #$id is null.");
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       public ResourceID getResourceID() {
+               return resourceID;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Error Handling
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Notifies the TaskExecutor that a fatal error has occurred and it 
cannot proceed.
+        * This method should be used when asynchronous threads want to notify 
the
+        * TaskExecutor of a fatal error.
+        *
+        * @param t The exception describing the fatal error
+        */
+       void onFatalErrorAsync(final Throwable t) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               onFatalError(t);
+                       }
+               });
+       }
+
+       /**
+        * Notifies the TaskExecutor that a fatal error has occurred and it 
cannot proceed.
+        * This method must only be called from within the TaskExecutor's main 
thread.
+        *
+        * @param t The exception describing the fatal error
+        */
+       void onFatalError(Throwable t) {
+               // to be determined, probably delegate to a fatal error handler 
that 
+               // would either log (mini cluster) ot kill the process (yarn, 
mesos, ...)
+               log.error("FATAL ERROR", t);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Access to fields for testing
+       // 
------------------------------------------------------------------------
+
+       @VisibleForTesting
+       TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
+               return resourceManagerConnection;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utility classes
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The listener for leader changes of the resource manager
+        */
+       private class ResourceManagerLeaderListener implements 
LeaderRetrievalListener {
+
+               @Override
+               public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionID) {
+                       
getSelf().notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+               }
+
+               @Override
+               public void handleError(Exception exception) {
+                       onFatalErrorAsync(exception);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
new file mode 100644
index 0000000..3707a47
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link TaskExecutor} Configuration
+ */
+public class TaskExecutorConfiguration implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private final String[] tmpDirPaths;
+
+       private final long cleanupInterval;
+
+       private final int numberOfSlots;
+
+       private final Configuration configuration;
+
+       private final FiniteDuration timeout;
+       private final FiniteDuration maxRegistrationDuration;
+       private final FiniteDuration initialRegistrationPause;
+       private final FiniteDuration maxRegistrationPause;
+       private final FiniteDuration refusedRegistrationPause;
+
+       private final NetworkEnvironmentConfiguration networkConfig;
+
+       private final InstanceConnectionInfo connectionInfo;
+
+       public TaskExecutorConfiguration(
+                       String[] tmpDirPaths,
+                       long cleanupInterval,
+                       InstanceConnectionInfo connectionInfo,
+                       NetworkEnvironmentConfiguration networkConfig,
+                       FiniteDuration timeout,
+                       FiniteDuration maxRegistrationDuration,
+                       int numberOfSlots,
+                       Configuration configuration) {
+
+               this (tmpDirPaths,
+                       cleanupInterval,
+                       connectionInfo,
+                       networkConfig,
+                       timeout,
+                       maxRegistrationDuration,
+                       numberOfSlots,
+                       configuration,
+                       new FiniteDuration(500, TimeUnit.MILLISECONDS),
+                       new FiniteDuration(30, TimeUnit.SECONDS),
+                       new FiniteDuration(10, TimeUnit.SECONDS));
+       }
+
+       public TaskExecutorConfiguration(
+                       String[] tmpDirPaths,
+                       long cleanupInterval,
+                       InstanceConnectionInfo connectionInfo,
+                       NetworkEnvironmentConfiguration networkConfig,
+                       FiniteDuration timeout,
+                       FiniteDuration maxRegistrationDuration,
+                       int numberOfSlots,
+                       Configuration configuration,
+                       FiniteDuration initialRegistrationPause,
+                       FiniteDuration maxRegistrationPause,
+                       FiniteDuration refusedRegistrationPause) {
+
+               this.tmpDirPaths = checkNotNull(tmpDirPaths);
+               this.cleanupInterval = checkNotNull(cleanupInterval);
+               this.connectionInfo = checkNotNull(connectionInfo);
+               this.networkConfig = checkNotNull(networkConfig);
+               this.timeout = checkNotNull(timeout);
+               this.maxRegistrationDuration = maxRegistrationDuration;
+               this.numberOfSlots = checkNotNull(numberOfSlots);
+               this.configuration = checkNotNull(configuration);
+               this.initialRegistrationPause = 
checkNotNull(initialRegistrationPause);
+               this.maxRegistrationPause = checkNotNull(maxRegistrationPause);
+               this.refusedRegistrationPause = 
checkNotNull(refusedRegistrationPause);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Properties
+       // 
--------------------------------------------------------------------------------------------
+
+       public String[] getTmpDirPaths() {
+               return tmpDirPaths;
+       }
+
+       public long getCleanupInterval() {
+               return cleanupInterval;
+       }
+
+       public InstanceConnectionInfo getConnectionInfo() { return 
connectionInfo; }
+
+       public NetworkEnvironmentConfiguration getNetworkConfig() { return 
networkConfig; }
+
+       public FiniteDuration getTimeout() {
+               return timeout;
+       }
+
+       public FiniteDuration getMaxRegistrationDuration() {
+               return maxRegistrationDuration;
+       }
+
+       public int getNumberOfSlots() {
+               return numberOfSlots;
+       }
+
+       public Configuration getConfiguration() {
+               return configuration;
+       }
+
+       public FiniteDuration getInitialRegistrationPause() {
+               return initialRegistrationPause;
+       }
+
+       public FiniteDuration getMaxRegistrationPause() {
+               return maxRegistrationPause;
+       }
+
+       public FiniteDuration getRefusedRegistrationPause() {
+               return refusedRegistrationPause;
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
new file mode 100644
index 0000000..6c99706
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+import java.util.UUID;
+
+/**
+ * {@link TaskExecutor} RPC gateway interface
+ */
+public interface TaskExecutorGateway extends RpcGateway {
+
+       // 
------------------------------------------------------------------------
+       //  ResourceManager handlers
+       // 
------------------------------------------------------------------------
+
+       void notifyOfNewResourceManagerLeader(String address, UUID 
resourceManagerLeaderId);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
new file mode 100644
index 0000000..b357f52
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+
+import java.io.Serializable;
+
+/**
+ * Base class for responses from the ResourceManager to a registration attempt 
by a
+ * TaskExecutor.
+ */
+public final class TaskExecutorRegistrationSuccess extends 
RegistrationResponse.Success implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private final InstanceID registrationId;
+
+       private final long heartbeatInterval;
+
+       /**
+        * Create a new {@code TaskExecutorRegistrationSuccess} message.
+        * 
+        * @param registrationId     The ID that the ResourceManager assigned 
the registration.
+        * @param heartbeatInterval  The interval in which the ResourceManager 
will heartbeat the TaskExecutor.
+        */
+       public TaskExecutorRegistrationSuccess(InstanceID registrationId, long 
heartbeatInterval) {
+               this.registrationId = registrationId;
+               this.heartbeatInterval = heartbeatInterval;
+       }
+
+       /**
+        * Gets the ID that the ResourceManager assigned the registration.
+        */
+       public InstanceID getRegistrationId() {
+               return registrationId;
+       }
+
+       /**
+        * Gets the interval in which the ResourceManager will heartbeat the 
TaskExecutor.
+        */
+       public long getHeartbeatInterval() {
+               return heartbeatInterval;
+       }
+
+       @Override
+       public String toString() {
+               return "TaskExecutorRegistrationSuccess (" + registrationId + " 
/ " + heartbeatInterval + ')';
+       }
+
+}
+
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
new file mode 100644
index 0000000..25332a0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RetryingRegistration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The connection between a TaskExecutor and the ResourceManager.
+ */
+public class TaskExecutorToResourceManagerConnection {
+
+       /** the logger for all log messages of this class */
+       private final Logger log;
+
+       /** the TaskExecutor whose connection to the ResourceManager this 
represents */
+       private final TaskExecutor taskExecutor;
+
+       private final UUID resourceManagerLeaderId;
+
+       private final String resourceManagerAddress;
+
+       private 
TaskExecutorToResourceManagerConnection.ResourceManagerRegistration 
pendingRegistration;
+
+       private ResourceManagerGateway registeredResourceManager;
+
+       private InstanceID registrationId;
+
+       /** flag indicating that the connection is closed */
+       private volatile boolean closed;
+
+
+       public TaskExecutorToResourceManagerConnection(
+                       Logger log,
+                       TaskExecutor taskExecutor,
+                       String resourceManagerAddress,
+                       UUID resourceManagerLeaderId) {
+
+               this.log = checkNotNull(log);
+               this.taskExecutor = checkNotNull(taskExecutor);
+               this.resourceManagerAddress = 
checkNotNull(resourceManagerAddress);
+               this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Life cycle
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("unchecked")
+       public void start() {
+               checkState(!closed, "The connection is already closed");
+               checkState(!isRegistered() && pendingRegistration == null, "The 
connection is already started");
+
+               pendingRegistration = new 
TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
+                               log, taskExecutor.getRpcService(),
+                               resourceManagerAddress, resourceManagerLeaderId,
+                               taskExecutor.getAddress(), 
taskExecutor.getResourceID());
+               pendingRegistration.startRegistration();
+
+               Future<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture();
+               
+               future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>>() {
+                       @Override
+                       public void onSuccess(Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> result) {
+                               registeredResourceManager = result.f0;
+                               registrationId = result.f1.getRegistrationId();
+                       }
+               }, taskExecutor.getMainThreadExecutionContext());
+               
+               // this future should only ever fail if there is a bug, not if 
the registration is declined
+               future.onFailure(new OnFailure() {
+                       @Override
+                       public void onFailure(Throwable failure) {
+                               taskExecutor.onFatalError(failure);
+                       }
+               }, taskExecutor.getMainThreadExecutionContext());
+       }
+
+       public void close() {
+               closed = true;
+
+               // make sure we do not keep re-trying forever
+               if (pendingRegistration != null) {
+                       pendingRegistration.cancel();
+               }
+       }
+
+       public boolean isClosed() {
+               return closed;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       public UUID getResourceManagerLeaderId() {
+               return resourceManagerLeaderId;
+       }
+
+       public String getResourceManagerAddress() {
+               return resourceManagerAddress;
+       }
+
+       /**
+        * Gets the ResourceManagerGateway. This returns null until the 
registration is completed.
+        */
+       public ResourceManagerGateway getResourceManager() {
+               return registeredResourceManager;
+       }
+
+       /**
+        * Gets the ID under which the TaskExecutor is registered at the 
ResourceManager.
+        * This returns null until the registration is completed.
+        */
+       public InstanceID getRegistrationId() {
+               return registrationId;
+       }
+
+       public boolean isRegistered() {
+               return registeredResourceManager != null;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return String.format("Connection to ResourceManager %s 
(leaderId=%s)",
+                               resourceManagerAddress, 
resourceManagerLeaderId); 
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static class ResourceManagerRegistration
+                       extends RetryingRegistration<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> {
+
+               private final String taskExecutorAddress;
+               
+               private final ResourceID resourceID;
+
+               ResourceManagerRegistration(
+                               Logger log,
+                               RpcService rpcService,
+                               String targetAddress,
+                               UUID leaderId,
+                               String taskExecutorAddress,
+                               ResourceID resourceID) {
+
+                       super(log, rpcService, "ResourceManager", 
ResourceManagerGateway.class, targetAddress, leaderId);
+                       this.taskExecutorAddress = 
checkNotNull(taskExecutorAddress);
+                       this.resourceID = checkNotNull(resourceID);
+               }
+
+               @Override
+               protected Future<RegistrationResponse> invokeRegistration(
+                               ResourceManagerGateway resourceManager, UUID 
leaderId, long timeoutMillis) throws Exception {
+
+                       FiniteDuration timeout = new 
FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS);
+                       return resourceManager.registerTaskExecutor(leaderId, 
taskExecutorAddress, resourceID, timeout);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
new file mode 100644
index 0000000..744308c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+
+
+/**
+ * Runs tests to ensure that a cluster is shutdown properly.
+ */
+public class ClusterShutdownITCase extends TestLogger {
+
+       private static ActorSystem system;
+
+       private static Configuration config = new Configuration();
+
+       @BeforeClass
+       public static void setup() {
+               system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(system);
+       }
+
+       /**
+        * Tests a faked cluster shutdown procedure without the ResourceManager.
+        */
+       @Test
+       public void testClusterShutdownWithoutResourceManager() {
+
+               new JavaTestKit(system){{
+               new Within(duration("30 seconds")) {
+               @Override
+               protected void run() {
+
+                       ActorGateway me =
+                               TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
+
+                       // start job manager which doesn't shutdown the actor 
system
+                       ActorGateway jobManager =
+                               TestingUtils.createJobManager(system, config, 
"jobmanager1");
+
+                       // Tell the JobManager to inform us of shutdown actions
+                       
jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
+
+                       // Register a TaskManager
+                       ActorGateway taskManager =
+                               TestingUtils.createTaskManager(system, 
jobManager, config, true, true);
+
+                       // Tell the TaskManager to inform us of TaskManager 
shutdowns
+                       
taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
+
+
+                       // No resource manager connected
+                       jobManager.tell(new 
StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
+
+                       expectMsgAllOf(
+                               new 
TestingMessages.ComponentShutdown(taskManager.actor()),
+                               new 
TestingMessages.ComponentShutdown(jobManager.actor()),
+                               StopClusterSuccessful.getInstance()
+                       );
+
+               }};
+               }};
+       }
+
+       /**
+        * Tests a faked cluster shutdown procedure with the ResourceManager.
+        */
+       @Test
+       public void testClusterShutdownWithResourceManager() {
+
+               new JavaTestKit(system){{
+               new Within(duration("30 seconds")) {
+               @Override
+               protected void run() {
+
+                       ActorGateway me =
+                               TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
+
+                       // start job manager which doesn't shutdown the actor 
system
+                       ActorGateway jobManager =
+                               TestingUtils.createJobManager(system, config, 
"jobmanager2");
+
+                       // Tell the JobManager to inform us of shutdown actions
+                       
jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
+
+                       // Register a TaskManager
+                       ActorGateway taskManager =
+                               TestingUtils.createTaskManager(system, 
jobManager, config, true, true);
+
+                       // Tell the TaskManager to inform us of TaskManager 
shutdowns
+                       
taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
+
+                       // Start resource manager and let it register
+                       ActorGateway resourceManager =
+                               TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
+
+                       // Tell the ResourceManager to inform us of 
ResourceManager shutdowns
+                       
resourceManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
+
+                       // notify about a resource manager registration at the 
job manager
+                       resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+                       // Wait for resource manager
+                       expectMsgEquals(Messages.getAcknowledge());
+
+
+                       // Shutdown cluster with resource manager connected
+                       jobManager.tell(new 
StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
+
+                       expectMsgAllOf(
+                               new 
TestingMessages.ComponentShutdown(taskManager.actor()),
+                               new 
TestingMessages.ComponentShutdown(jobManager.actor()),
+                               new 
TestingMessages.ComponentShutdown(resourceManager.actor()),
+                               StopClusterSuccessful.getInstance()
+                       );
+
+               }};
+               }};
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
new file mode 100644
index 0000000..1565dc3
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * It cases which test the interaction of the resource manager with job 
manager and task managers.
+ * Runs all tests in one Actor system.
+ */
+public class ResourceManagerITCase extends TestLogger {
+
+       private static ActorSystem system;
+
+       private static Configuration config = new Configuration();
+
+       @BeforeClass
+       public static void setup() {
+               system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(system);
+       }
+
+       /**
+        * Tests whether the resource manager connects and reconciles existing 
task managers.
+        */
+       @Test
+       public void testResourceManagerReconciliation() {
+
+               new JavaTestKit(system){{
+               new Within(duration("10 seconds")) {
+               @Override
+               protected void run() {
+
+                       ActorGateway jobManager =
+                               TestingUtils.createJobManager(system, config, 
"ReconciliationTest");
+                       ActorGateway me =
+                               TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
+
+                       // !! no resource manager started !!
+
+                       ResourceID resourceID = ResourceID.generate();
+
+                       TaskManagerLocation location = 
mock(TaskManagerLocation.class);
+                       when(location.getResourceID()).thenReturn(resourceID);
+
+                       HardwareDescription resourceProfile = 
HardwareDescription.extractFromSystem(1_000_000);
+
+                       jobManager.tell(
+                               new 
RegistrationMessages.RegisterTaskManager(resourceID, location, resourceProfile, 
1),
+                               me);
+
+                       
expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
+
+                       // now start the resource manager
+                       ActorGateway resourceManager =
+                               TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
+
+                       // register at testing job manager to receive a message 
once a resource manager registers
+                       resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+                       // Wait for resource manager
+                       expectMsgEquals(Messages.getAcknowledge());
+
+                       // check if we registered the task manager resource
+                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), me);
+
+                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
+                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+                       assertEquals(1, reply.resources.size());
+                       assertTrue(reply.resources.contains(resourceID));
+
+               }};
+               }};
+       }
+
+       /**
+        * Tests whether the resource manager gets informed upon TaskManager 
registration.
+        */
+       @Test
+       public void testResourceManagerTaskManagerRegistration() {
+
+               new JavaTestKit(system){{
+               new Within(duration("30 seconds")) {
+               @Override
+               protected void run() {
+
+                       ActorGateway jobManager =
+                               TestingUtils.createJobManager(system, config, 
"RegTest");
+                       ActorGateway me =
+                               TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
+
+                       // start the resource manager
+                       ActorGateway resourceManager =
+                               TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
+
+                       // notify about a resource manager registration at the 
job manager
+                       resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+                       // Wait for resource manager
+                       expectMsgEquals(Messages.getAcknowledge());
+
+                       // start task manager and wait for registration
+                       ActorGateway taskManager =
+                               TestingUtils.createTaskManager(system, 
jobManager.actor(), config, true, true);
+
+                       // check if we registered the task manager resource
+                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), me);
+
+                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
+                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+                       assertEquals(1, reply.resources.size());
+
+               }};
+               }};
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
new file mode 100644
index 0000000..ca8a07a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * General tests for the resource manager component.
+ */
+public class ResourceManagerTest {
+
+       private static ActorSystem system;
+
+       private static ActorGateway fakeJobManager;
+       private static ActorGateway resourceManager;
+
+       private static Configuration config = new Configuration();
+
+       @BeforeClass
+       public static void setup() {
+               system = AkkaUtils.createLocalActorSystem(config);
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(system);
+       }
+
+       /**
+        * Tests the registration and reconciliation of the ResourceManager 
with the JobManager
+        */
+       @Test
+       public void testJobManagerRegistrationAndReconciliation() {
+               new JavaTestKit(system){{
+               new Within(duration("10 seconds")) {
+               @Override
+               protected void run() {
+                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
+                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+
+                       expectMsgClass(RegisterResourceManager.class);
+
+                       List<ResourceID> resourceList = new ArrayList<>();
+                       resourceList.add(ResourceID.generate());
+                       resourceList.add(ResourceID.generate());
+                       resourceList.add(ResourceID.generate());
+
+                       resourceManager.tell(
+                               new 
RegisterResourceManagerSuccessful(fakeJobManager.actor(), resourceList),
+                               fakeJobManager);
+
+                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
+                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+                       for (ResourceID id : resourceList) {
+                               if (!reply.resources.contains(id)) {
+                                       fail("Expected to find all resources 
that were provided during registration.");
+                               }
+                       }
+               }};
+               }};
+       }
+
+       /**
+        * Tests delayed or erroneous registration of the ResourceManager with 
the JobManager
+        */
+       @Test
+       public void testDelayedJobManagerRegistration() {
+               new JavaTestKit(system){{
+               new Within(duration("10 seconds")) {
+               @Override
+               protected void run() {
+
+                       // set a short timeout for lookups
+                       Configuration shortTimeoutConfig = config.clone();
+                       
shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s");
+
+                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
+                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), 
shortTimeoutConfig);
+
+                       // wait for registration message
+                       RegisterResourceManager msg = 
expectMsgClass(RegisterResourceManager.class);
+                       // give wrong response
+                       getLastSender().tell(new 
JobManagerMessages.LeaderSessionMessage(null, new Object()),
+                               fakeJobManager.actor());
+
+                       // expect another retry and let it time out
+                       expectMsgClass(RegisterResourceManager.class);
+
+                       // wait for next try after timeout
+                       expectMsgClass(RegisterResourceManager.class);
+
+               }};
+               }};
+       }
+
+       @Test
+       public void testTriggerReconnect() {
+               new JavaTestKit(system){{
+               new Within(duration("10 seconds")) {
+               @Override
+               protected void run() {
+
+                       // set a long timeout for lookups such that the test 
fails in case of timeouts
+                       Configuration shortTimeoutConfig = config.clone();
+                       
shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s");
+
+                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
+                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), 
shortTimeoutConfig);
+
+                       // wait for registration message
+                       RegisterResourceManager msg = 
expectMsgClass(RegisterResourceManager.class);
+                       // all went well
+                       resourceManager.tell(
+                               new 
RegisterResourceManagerSuccessful(fakeJobManager.actor(), 
Collections.<ResourceID>emptyList()),
+                               fakeJobManager);
+
+                       // force a reconnect
+                       resourceManager.tell(
+                               new 
TriggerRegistrationAtJobManager(fakeJobManager.actor()),
+                               fakeJobManager);
+
+                       // new registration attempt should come in
+                       expectMsgClass(RegisterResourceManager.class);
+
+               }};
+               }};
+       }
+
+       /**
+        * Tests the registration and accounting of resources at the 
ResourceManager.
+        */
+       @Test
+       public void testTaskManagerRegistration() {
+               new JavaTestKit(system){{
+               new Within(duration("10 seconds")) {
+               @Override
+               protected void run() {
+
+                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
+                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+
+                       // register with JM
+                       expectMsgClass(RegisterResourceManager.class);
+                       resourceManager.tell(
+                               new 
RegisterResourceManagerSuccessful(fakeJobManager.actor(), 
Collections.<ResourceID>emptyList()),
+                               fakeJobManager);
+
+                       ResourceID resourceID = ResourceID.generate();
+
+                       // Send task manager registration
+                       resourceManager.tell(new 
NotifyResourceStarted(resourceID),
+                               fakeJobManager);
+
+                       expectMsgClass(Acknowledge.class);
+
+                       // check for number registration of registered resources
+                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
+                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+                       assertEquals(1, reply.resources.size());
+
+                       // Send task manager registration again
+                       resourceManager.tell(new 
NotifyResourceStarted(resourceID),
+                               fakeJobManager);
+
+                       expectMsgClass(Acknowledge.class);
+
+                       // check for number registration of registered resources
+                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+                       reply = 
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+                       assertEquals(1, reply.resources.size());
+
+                       // Send invalid null resource id to throw an exception 
during resource registration
+                       resourceManager.tell(new NotifyResourceStarted(null),
+                               fakeJobManager);
+
+                       expectMsgClass(Acknowledge.class);
+
+                       // check for number registration of registered resources
+                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+                       reply = 
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+                       assertEquals(1, reply.resources.size());
+               }};
+               }};
+       }
+
+       @Test
+       public void testResourceRemoval() {
+               new JavaTestKit(system){{
+               new Within(duration("10 seconds")) {
+               @Override
+               protected void run() {
+
+                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
+                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+
+                       // register with JM
+                       expectMsgClass(RegisterResourceManager.class);
+                       resourceManager.tell(
+                               new 
RegisterResourceManagerSuccessful(fakeJobManager.actor(), 
Collections.<ResourceID>emptyList()),
+                               fakeJobManager);
+
+                       ResourceID resourceID = ResourceID.generate();
+
+                       // remove unknown resource
+                       resourceManager.tell(new RemoveResource(resourceID), 
fakeJobManager);
+
+                       // Send task manager registration
+                       resourceManager.tell(new 
NotifyResourceStarted(resourceID),
+                               fakeJobManager);
+
+                       expectMsgClass(Acknowledge.class);
+
+                       // check for number registration of registered resources
+                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
+                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+                       assertEquals(1, reply.resources.size());
+                       assertTrue(reply.resources.contains(resourceID));
+
+                       // remove resource
+                       resourceManager.tell(new RemoveResource(resourceID), 
fakeJobManager);
+
+                       // check for number registration of registered resources
+                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+                       reply = 
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+                       assertEquals(0, reply.resources.size());
+
+               }};
+               }};
+       }
+
+       /**
+        * Tests notification of JobManager about a failed resource.
+        */
+       @Test
+       public void testResourceFailureNotification() {
+               new JavaTestKit(system){{
+               new Within(duration("10 seconds")) {
+               @Override
+               protected void run() {
+
+                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
+                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+
+                       // register with JM
+                       expectMsgClass(RegisterResourceManager.class);
+                       resourceManager.tell(
+                               new 
RegisterResourceManagerSuccessful(fakeJobManager.actor(), 
Collections.<ResourceID>emptyList()),
+                               fakeJobManager);
+
+                       ResourceID resourceID1 = ResourceID.generate();
+                       ResourceID resourceID2 = ResourceID.generate();
+
+                       // Send task manager registration
+                       resourceManager.tell(new 
NotifyResourceStarted(resourceID1),
+                               fakeJobManager);
+
+                       expectMsgClass(Acknowledge.class);
+
+                       // Send task manager registration
+                       resourceManager.tell(new 
NotifyResourceStarted(resourceID2),
+                               fakeJobManager);
+
+                       expectMsgClass(Acknowledge.class);
+
+                       // check for number registration of registered resources
+                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
+                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+                       assertEquals(2, reply.resources.size());
+                       assertTrue(reply.resources.contains(resourceID1));
+                       assertTrue(reply.resources.contains(resourceID2));
+
+                       // fail resources
+                       resourceManager.tell(new 
TestingResourceManager.FailResource(resourceID1), fakeJobManager);
+                       resourceManager.tell(new 
TestingResourceManager.FailResource(resourceID2), fakeJobManager);
+
+                       ResourceRemoved answer = 
expectMsgClass(ResourceRemoved.class);
+                       ResourceRemoved answer2 = 
expectMsgClass(ResourceRemoved.class);
+
+                       assertEquals(resourceID1, answer.resourceId());
+                       assertEquals(resourceID2, answer2.resourceId());
+
+               }};
+               }};
+       }
+}

Reply via email to