http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
deleted file mode 100644
index 4f756fb..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
+++ /dev/null
@@ -1,749 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.HybridMemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.LocalConnectionManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateServer;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.taskexecutor.TaskExecutor;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorConfiguration;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.MathUtils;
-import org.apache.flink.util.NetUtils;
-
-import akka.actor.ActorSystem;
-import akka.util.Timeout;
-import com.typesafe.config.Config;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * This class is the executable entry point for the task manager in yarn or 
standalone mode.
- * It constructs the related components (network, I/O manager, memory manager, 
RPC service, HA service)
- * and starts them.
- */
-public class TaskManagerRunner {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerRunner.class);
-
-       /**
-        * Constructs related components of the TaskManager and starts them.
-        *
-        * @param configuration                 The configuration for the 
TaskManager.
-        * @param resourceID                    The id of the resource which 
the task manager will run on.
-        * @param rpcService                    Optionally, The rpc service 
which is used to start and connect to the TaskManager RpcEndpoint .
-        *                                                 If none is given, 
then a RpcService is constructed from the configuration.
-        * @param taskManagerHostname   Optionally, The hostname/address that 
describes the TaskManager's data location.
-        *                                                 If none is given, it 
can be got from the configuration.
-        * @param localTaskManagerCommunication      If true, the TaskManager 
will not initiate the TCP network stack.
-        * @param haServices                    Optionally, a high availability 
service can be provided. If none is given,
-        *                                                 then a 
HighAvailabilityServices is constructed from the configuration.
-        */
-       public static void createAndStartComponents(
-               final Configuration configuration,
-               final ResourceID resourceID,
-               RpcService rpcService,
-               String taskManagerHostname,
-               boolean localTaskManagerCommunication,
-               HighAvailabilityServices haServices) throws Exception {
-
-               checkNotNull(configuration);
-               checkNotNull(resourceID);
-
-               if (taskManagerHostname == null || 
taskManagerHostname.isEmpty()) {
-                       taskManagerHostname = 
selectNetworkInterface(configuration);
-               }
-
-               if (rpcService == null) {
-                       // if no task manager port has been configured, use 0 
(system will pick any free port)
-                       final int actorSystemPort = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
-                       if (actorSystemPort < 0 || actorSystemPort > 65535) {
-                               throw new 
IllegalConfigurationException("Invalid value for '" +
-                                       
ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
-                                       "' (port for the TaskManager actor 
system) : " + actorSystemPort +
-                                       " - Leave config parameter empty or use 
0 to let the system choose a port automatically.");
-                       }
-                       rpcService = createRpcService(configuration, 
taskManagerHostname, actorSystemPort);
-               }
-
-               if(haServices == null) {
-                       // start high availability service to implement 
getResourceManagerLeaderRetriever method only
-                       haServices = new HighAvailabilityServices() {
-                               @Override
-                               public LeaderRetrievalService 
getResourceManagerLeaderRetriever() throws Exception {
-                                       return 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-                               }
-
-                               @Override
-                               public LeaderElectionService 
getResourceManagerLeaderElectionService() throws Exception {
-                                       return null;
-                               }
-
-                               @Override
-                               public LeaderElectionService 
getJobMasterLeaderElectionService(JobID jobID) throws Exception {
-                                       return null;
-                               }
-                       };
-               }
-
-               createAndStartTaskManagerComponents(
-                       configuration,
-                       resourceID,
-                       rpcService,
-                       taskManagerHostname,
-                       haServices,
-                       localTaskManagerCommunication);
-       }
-
-       /**
-        * <p/>
-        * This method tries to select the network interface to use for the 
TaskManager
-        * communication. The network interface is used both for the actor 
communication
-        * (coordination) as well as for the data exchange between task 
managers. Unless
-        * the hostname/interface is explicitly configured in the 
configuration, this
-        * method will try out various interfaces and methods to connect to the 
JobManager
-        * and select the one where the connection attempt is successful.
-        * <p/>
-        *
-        * @param configuration    The configuration for the TaskManager.
-        * @return  The host name under which the TaskManager communicates.
-        */
-       private static String selectNetworkInterface(Configuration 
configuration) throws Exception {
-               String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
-               if (taskManagerHostname != null) {
-                       LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
-               } else {
-                       LeaderRetrievalService leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-                       FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
-
-                       InetAddress taskManagerAddress = 
LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, 
lookupTimeout);
-                       taskManagerHostname = taskManagerAddress.getHostName();
-                       LOG.info("TaskManager will use hostname/address '{}' 
({}) for communication.",
-                               taskManagerHostname, 
taskManagerAddress.getHostAddress());
-               }
-
-               return taskManagerHostname;
-       }
-
-       /**
-        * Utility method to create RPC service from configuration and 
hostname, port.
-        *
-        * @param configuration                 The configuration for the 
TaskManager.
-        * @param taskManagerHostname   The hostname/address that describes the 
TaskManager's data location.
-        * @param actorSystemPort           If true, the TaskManager will not 
initiate the TCP network stack.
-        * @return   The rpc service which is used to start and connect to the 
TaskManager RpcEndpoint .
-        * @throws java.io.IOException      Thrown, if the actor system can not 
bind to the address
-        * @throws java.lang.Exception      Thrown is some other error occurs 
while creating akka actor system
-        */
-       private static RpcService createRpcService(Configuration configuration, 
String taskManagerHostname, int actorSystemPort)
-               throws Exception{
-
-               // Bring up the TaskManager actor system first, bind it to the 
given address.
-
-               LOG.info("Starting TaskManager actor system at " +
-                       NetUtils.hostAndPortToUrlString(taskManagerHostname, 
actorSystemPort));
-
-               final ActorSystem taskManagerSystem;
-               try {
-                       Tuple2<String, Object> address = new Tuple2<String, 
Object>(taskManagerHostname, actorSystemPort);
-                       Config akkaConfig = 
AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
-                       LOG.debug("Using akka configuration\n " + akkaConfig);
-                       taskManagerSystem = 
AkkaUtils.createActorSystem(akkaConfig);
-               } catch (Throwable t) {
-                       if (t instanceof 
org.jboss.netty.channel.ChannelException) {
-                               Throwable cause = t.getCause();
-                               if (cause != null && t.getCause() instanceof 
java.net.BindException) {
-                                       String address = 
NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
-                                       throw new IOException("Unable to bind 
TaskManager actor system to address " +
-                                               address + " - " + 
cause.getMessage(), t);
-                               }
-                       }
-                       throw new Exception("Could not create TaskManager actor 
system", t);
-               }
-
-               // start akka rpc service based on actor system
-               final Timeout timeout = new 
Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS);
-               final AkkaRpcService akkaRpcService = new 
AkkaRpcService(taskManagerSystem, timeout);
-
-               return akkaRpcService;
-       }
-
-       /**
-        * @param configuration                 The configuration for the 
TaskManager.
-        * @param resourceID                    The id of the resource which 
the task manager will run on.
-        * @param rpcService                    The rpc service which is used 
to start and connect to the TaskManager RpcEndpoint .
-        * @param taskManagerHostname   The hostname/address that describes the 
TaskManager's data location.
-        * @param haServices                    Optionally, a high availability 
service can be provided. If none is given,
-        *                                                  then a 
HighAvailabilityServices is constructed from the configuration.
-        * @param localTaskManagerCommunication     If true, the TaskManager 
will not initiate the TCP network stack.
-        * @throws IllegalConfigurationException        Thrown, if the given 
config contains illegal values.
-        * @throws IOException      Thrown, if any of the I/O components (such 
as buffer pools, I/O manager, ...)
-        *                                              cannot be properly 
started.
-        * @throws Exception      Thrown is some other error occurs while 
parsing the configuration or
-        *                                              starting the 
TaskManager components.
-        */
-       private static void createAndStartTaskManagerComponents(
-               Configuration configuration,
-               ResourceID resourceID,
-               RpcService rpcService,
-               String taskManagerHostname,
-               HighAvailabilityServices haServices,
-               boolean localTaskManagerCommunication) throws Exception {
-
-               final TaskExecutorConfiguration taskManagerConfig = 
parseTaskManagerConfiguration(
-                       configuration, taskManagerHostname, 
localTaskManagerCommunication);
-
-               TaskManagerComponents taskManagerComponents = 
createTaskManagerComponents(
-                       resourceID,
-                       InetAddress.getByName(taskManagerHostname),
-                       taskManagerConfig,
-                       configuration);
-
-               final TaskExecutor taskExecutor = new TaskExecutor(
-                       taskManagerConfig,
-                       taskManagerComponents.getTaskManagerLocation(),
-                       rpcService, taskManagerComponents.getMemoryManager(),
-                       taskManagerComponents.getIOManager(),
-                       taskManagerComponents.getNetworkEnvironment(),
-                       haServices);
-
-               taskExecutor.start();
-       }
-
-       /**
-        * Creates and returns the task manager components.
-        *
-        * @param resourceID resource ID of the task manager
-        * @param taskManagerAddress address of the task manager
-        * @param taskExecutorConfig task manager configuration
-        * @param configuration of Flink
-        * @return task manager components
-        * @throws Exception
-        */
-       private static TaskManagerComponents createTaskManagerComponents(
-               ResourceID resourceID,
-               InetAddress taskManagerAddress,
-               TaskExecutorConfiguration taskExecutorConfig,
-               Configuration configuration) throws Exception {
-
-               MemoryType memType = 
taskExecutorConfig.getNetworkConfig().memoryType();
-
-               // pre-start checks
-               checkTempDirs(taskExecutorConfig.getTmpDirPaths());
-
-               NetworkEnvironmentConfiguration networkEnvironmentConfiguration 
= taskExecutorConfig.getNetworkConfig();
-
-               NetworkBufferPool networkBufferPool = new NetworkBufferPool(
-                       networkEnvironmentConfiguration.numNetworkBuffers(),
-                       networkEnvironmentConfiguration.networkBufferSize(),
-                       networkEnvironmentConfiguration.memoryType());
-
-               ConnectionManager connectionManager;
-
-               if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
-                       connectionManager = new 
NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get());
-               } else {
-                       connectionManager = new LocalConnectionManager();
-               }
-
-               ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
-               TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
-
-               KvStateRegistry kvStateRegistry = new KvStateRegistry();
-
-               KvStateServer kvStateServer;
-
-               if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
-                       NettyConfig nettyConfig = 
networkEnvironmentConfiguration.nettyConfig().get();
-
-                       int numNetworkThreads = 
networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
-                               nettyConfig.getNumberOfSlots() : 
networkEnvironmentConfiguration.queryServerNetworkThreads();
-
-                       int numQueryThreads = 
networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
-                               nettyConfig.getNumberOfSlots() : 
networkEnvironmentConfiguration.queryServerQueryThreads();
-
-                       kvStateServer = new KvStateServer(
-                               taskManagerAddress,
-                               
networkEnvironmentConfiguration.queryServerPort(),
-                               numNetworkThreads,
-                               numQueryThreads,
-                               kvStateRegistry,
-                               new DisabledKvStateRequestStats());
-               } else {
-                       kvStateServer = null;
-               }
-
-               // we start the network first, to make sure it can allocate its 
buffers first
-               final NetworkEnvironment network = new NetworkEnvironment(
-                       networkBufferPool,
-                       connectionManager,
-                       resultPartitionManager,
-                       taskEventDispatcher,
-                       kvStateRegistry,
-                       kvStateServer,
-                       networkEnvironmentConfiguration.ioMode(),
-                       
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
-                       
networkEnvironmentConfiguration.partitinRequestMaxBackoff());
-
-               network.start();
-
-               final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(
-                       resourceID,
-                       taskManagerAddress,
-                       network.getConnectionManager().getDataPort());
-
-               // computing the amount of memory to use depends on how much 
memory is available
-               // it strictly needs to happen AFTER the network stack has been 
initialized
-
-               // check if a value has been configured
-               long configuredMemory = 
configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
-               checkConfigParameter(configuredMemory == -1 || configuredMemory 
> 0, configuredMemory,
-                       ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
-                       "MemoryManager needs at least one MB of memory. " +
-                               "If you leave this config parameter empty, the 
system automatically " +
-                               "pick a fraction of the available memory.");
-
-               final long memorySize;
-               boolean preAllocateMemory = configuration.getBoolean(
-                       ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
-                       
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
-               if (configuredMemory > 0) {
-                       if (preAllocateMemory) {
-                               LOG.info("Using {} MB for managed memory." , 
configuredMemory);
-                       } else {
-                               LOG.info("Limiting managed memory to {} MB, 
memory will be allocated lazily." , configuredMemory);
-                       }
-                       memorySize = configuredMemory << 20; // megabytes to 
bytes
-               } else {
-                       float fraction = configuration.getFloat(
-                               
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-                               
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
-                       checkConfigParameter(fraction > 0.0f && fraction < 
1.0f, fraction,
-                               
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-                               "MemoryManager fraction of the free memory must 
be between 0.0 and 1.0");
-
-                       if (memType == MemoryType.HEAP) {
-                               long relativeMemSize = (long) 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
-                               if (preAllocateMemory) {
-                                       LOG.info("Using {} of the currently 
free heap space for managed heap memory ({} MB)." ,
-                                               fraction , relativeMemSize >> 
20);
-                               } else {
-                                       LOG.info("Limiting managed memory to {} 
of the currently free heap space ({} MB), " +
-                                               "memory will be allocated 
lazily." , fraction , relativeMemSize >> 20);
-                               }
-                               memorySize = relativeMemSize;
-                       } else if (memType == MemoryType.OFF_HEAP) {
-                               // The maximum heap memory has been adjusted 
according to the fraction
-                               long maxMemory = 
EnvironmentInformation.getMaxJvmHeapMemory();
-                               long directMemorySize = (long) (maxMemory / 
(1.0 - fraction) * fraction);
-                               if (preAllocateMemory) {
-                                       LOG.info("Using {} of the maximum 
memory size for managed off-heap memory ({} MB)." ,
-                                               fraction, directMemorySize >> 
20);
-                               } else {
-                                       LOG.info("Limiting managed memory to {} 
of the maximum memory size ({} MB)," +
-                                               " memory will be allocated 
lazily.", fraction, directMemorySize >> 20);
-                               }
-                               memorySize = directMemorySize;
-                       } else {
-                               throw new RuntimeException("No supported memory 
type detected.");
-                       }
-               }
-
-               // now start the memory manager
-               final MemoryManager memoryManager;
-               try {
-                       memoryManager = new MemoryManager(
-                               memorySize,
-                               taskExecutorConfig.getNumberOfSlots(),
-                               
taskExecutorConfig.getNetworkConfig().networkBufferSize(),
-                               memType,
-                               preAllocateMemory);
-               } catch (OutOfMemoryError e) {
-                       if (memType == MemoryType.HEAP) {
-                               throw new Exception("OutOfMemory error (" + 
e.getMessage() +
-                                       ") while allocating the TaskManager 
heap memory (" + memorySize + " bytes).", e);
-                       } else if (memType == MemoryType.OFF_HEAP) {
-                               throw new Exception("OutOfMemory error (" + 
e.getMessage() +
-                                       ") while allocating the TaskManager 
off-heap memory (" + memorySize +
-                                       " bytes).Try increasing the maximum 
direct memory (-XX:MaxDirectMemorySize)", e);
-                       } else {
-                               throw e;
-                       }
-               }
-
-               // start the I/O manager, it will create some temp directories.
-               final IOManager ioManager = new 
IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
-
-               return new TaskManagerComponents(taskManagerLocation, 
memoryManager, ioManager, network);
-       }
-
-       // 
--------------------------------------------------------------------------
-       //  Parsing and checking the TaskManager Configuration
-       // 
--------------------------------------------------------------------------
-
-       /**
-        * Utility method to extract TaskManager config parameters from the 
configuration and to
-        * sanity check them.
-        *
-        * @param configuration                         The configuration.
-        * @param taskManagerHostname           The host name under which the 
TaskManager communicates.
-        * @param localTaskManagerCommunication             True, to skip 
initializing the network stack.
-        *                                      Use only in cases where only 
one task manager runs.
-        * @return TaskExecutorConfiguration that wrappers 
InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
-        */
-       private static TaskExecutorConfiguration parseTaskManagerConfiguration(
-               Configuration configuration,
-               String taskManagerHostname,
-               boolean localTaskManagerCommunication) throws Exception {
-
-               // ------- read values from the config and check them ---------
-               //                      (a lot of them)
-
-               // ----> hosts / ports for communication and data exchange
-
-               int dataport = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-                       ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
-
-               checkConfigParameter(dataport > 0, dataport, 
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-                       "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
-
-               InetAddress taskManagerAddress = 
InetAddress.getByName(taskManagerHostname);
-               final InetSocketAddress taskManagerInetSocketAddress = new 
InetSocketAddress(taskManagerAddress, dataport);
-
-               // ----> memory / network stack (shuffles/broadcasts), task 
slots, temp directories
-
-               // we need this because many configs have been written with a 
"-1" entry
-               int slots = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-               if (slots == -1) {
-                       slots = 1;
-               }
-
-               checkConfigParameter(slots >= 1, slots, 
ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-                       "Number of task slots must be at least one.");
-
-               final int numNetworkBuffers = configuration.getInteger(
-                       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-                       
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
-
-               checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
-                       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 
"");
-
-               final int pageSize = configuration.getInteger(
-                       ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-                       
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
-
-               // check page size of for minimum size
-               checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, 
pageSize,
-                       ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-                       "Minimum memory segment size is " + 
MemoryManager.MIN_PAGE_SIZE);
-
-               // check page size for power of two
-               checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
-                       ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-                       "Memory segment size must be a power of 2.");
-
-               // check whether we use heap or off-heap memory
-               final MemoryType memType;
-               if 
(configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, 
false)) {
-                       memType = MemoryType.OFF_HEAP;
-               } else {
-                       memType = MemoryType.HEAP;
-               }
-
-               // initialize the memory segment factory accordingly
-               if (memType == MemoryType.HEAP) {
-                       if 
(!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
-                               throw new Exception("Memory type is set to heap 
memory, but memory segment " +
-                                       "factory has been initialized for 
off-heap memory segments");
-                       }
-               } else {
-                       if 
(!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) 
{
-                               throw new Exception("Memory type is set to 
off-heap memory, but memory segment " +
-                                       "factory has been initialized for heap 
memory segments");
-                       }
-               }
-
-               final String[] tmpDirs = configuration.getString(
-                       ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-                       
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
-
-               final NettyConfig nettyConfig;
-               if (!localTaskManagerCommunication) {
-                       nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(),
-                               taskManagerInetSocketAddress.getPort(), 
pageSize, slots, configuration);
-               } else {
-                       nettyConfig = null;
-               }
-
-               // Default spill I/O mode for intermediate results
-               final String syncOrAsync = configuration.getString(
-                       ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
-                       
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
-
-               final IOManager.IOMode ioMode;
-               if (syncOrAsync.equals("async")) {
-                       ioMode = IOManager.IOMode.ASYNC;
-               } else {
-                       ioMode = IOManager.IOMode.SYNC;
-               }
-
-               final int queryServerPort =  configuration.getInteger(
-                       ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
-                       ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
-
-               final int queryServerNetworkThreads =  configuration.getInteger(
-                       ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
-                       
ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
-
-               final int queryServerQueryThreads =  configuration.getInteger(
-                       ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
-                       
ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
-
-               final NetworkEnvironmentConfiguration networkConfig = new 
NetworkEnvironmentConfiguration(
-                       numNetworkBuffers,
-                       pageSize,
-                       memType,
-                       ioMode,
-                       queryServerPort,
-                       queryServerNetworkThreads,
-                       queryServerQueryThreads,
-                       Option.apply(nettyConfig),
-                       500,
-                       3000);
-
-               // ----> timeouts, library caching, profiling
-
-               final FiniteDuration timeout;
-               try {
-                       timeout = AkkaUtils.getTimeout(configuration);
-               } catch (Exception e) {
-                       throw new IllegalArgumentException(
-                               "Invalid format for '" + 
ConfigConstants.AKKA_ASK_TIMEOUT +
-                                       "'.Use formats like '50 s' or '1 min' 
to specify the timeout.");
-               }
-               LOG.info("Messages between TaskManager and JobManager have a 
max timeout of " + timeout);
-
-               final long cleanupInterval = configuration.getLong(
-                       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-                       
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
-
-               final FiniteDuration finiteRegistrationDuration;
-               try {
-                       Duration maxRegistrationDuration = 
Duration.create(configuration.getString(
-                               
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
-                               
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
-                       if (maxRegistrationDuration.isFinite()) {
-                               finiteRegistrationDuration = new 
FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS);
-                       } else {
-                               finiteRegistrationDuration = null;
-                       }
-               } catch (NumberFormatException e) {
-                       throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
-               }
-
-               final FiniteDuration initialRegistrationPause;
-               try {
-                       Duration pause = 
Duration.create(configuration.getString(
-                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-                               
ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
-                       if (pause.isFinite()) {
-                               initialRegistrationPause = new 
FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
-                       } else {
-                               throw new IllegalArgumentException("The initial 
registration pause must be finite: " + pause);
-                       }
-               } catch (NumberFormatException e) {
-                       throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
-               }
-
-               final FiniteDuration maxRegistrationPause;
-               try {
-                       Duration pause = 
Duration.create(configuration.getString(
-                               
ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
-                               
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
-                       if (pause.isFinite()) {
-                               maxRegistrationPause = new 
FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
-                       } else {
-                               throw new IllegalArgumentException("The maximum 
registration pause must be finite: " + pause);
-                       }
-               } catch (NumberFormatException e) {
-                       throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
-               }
-
-               final FiniteDuration refusedRegistrationPause;
-               try {
-                       Duration pause = 
Duration.create(configuration.getString(
-                               
ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
-                               
ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
-                       if (pause.isFinite()) {
-                               refusedRegistrationPause = new 
FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
-                       } else {
-                               throw new IllegalArgumentException("The refused 
registration pause must be finite: " + pause);
-                       }
-               } catch (NumberFormatException e) {
-                       throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
-               }
-
-               return new TaskExecutorConfiguration(
-                       tmpDirs,
-                       cleanupInterval,
-                       networkConfig,
-                       timeout,
-                       finiteRegistrationDuration,
-                       slots,
-                       configuration,
-                       initialRegistrationPause,
-                       maxRegistrationPause,
-                       refusedRegistrationPause);
-       }
-
-       /**
-        * Validates a condition for a config parameter and displays a standard 
exception, if the
-        * the condition does not hold.
-        *
-        * @param condition             The condition that must hold. If the 
condition is false, an exception is thrown.
-        * @param parameter         The parameter value. Will be shown in the 
exception message.
-        * @param name              The name of the config parameter. Will be 
shown in the exception message.
-        * @param errorMessage  The optional custom error message to append to 
the exception message.
-        */
-       private static void checkConfigParameter(
-               boolean condition,
-               Object parameter,
-               String name,
-               String errorMessage) {
-               if (!condition) {
-                       throw new IllegalConfigurationException("Invalid 
configuration value for " + name + " : " + parameter + " - " + errorMessage);
-               }
-       }
-
-       /**
-        * Validates that all the directories denoted by the strings do 
actually exist, are proper
-        * directories (not files), and are writable.
-        *
-        * @param tmpDirs       The array of directory paths to check.
-        * @throws Exception    Thrown if any of the directories does not exist 
or is not writable
-        *                   or is a file, rather than a directory.
-        */
-       private static void checkTempDirs(String[] tmpDirs) throws IOException {
-               for (String dir : tmpDirs) {
-                       if (dir != null && !dir.equals("")) {
-                               File file = new File(dir);
-                               if (!file.exists()) {
-                                       throw new IOException("Temporary file 
directory " + file.getAbsolutePath() + " does not exist.");
-                               }
-                               if (!file.isDirectory()) {
-                                       throw new IOException("Temporary file 
directory " + file.getAbsolutePath() + " is not a directory.");
-                               }
-                               if (!file.canWrite()) {
-                                       throw new IOException("Temporary file 
directory " + file.getAbsolutePath() + " is not writable.");
-                               }
-
-                               if (LOG.isInfoEnabled()) {
-                                       long totalSpaceGb = 
file.getTotalSpace() >> 30;
-                                       long usableSpaceGb = 
file.getUsableSpace() >> 30;
-                                       double usablePercentage = 
(double)usableSpaceGb / totalSpaceGb * 100;
-                                       String path = file.getAbsolutePath();
-                                       LOG.info(String.format("Temporary file 
directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
-                                               path, totalSpaceGb, 
usableSpaceGb, usablePercentage));
-                               }
-                       } else {
-                               throw new IllegalArgumentException("Temporary 
file directory #$id is null.");
-                       }
-               }
-       }
-
-       private static class TaskManagerComponents {
-               private final TaskManagerLocation taskManagerLocation;
-               private final MemoryManager memoryManager;
-               private final IOManager ioManager;
-               private final NetworkEnvironment networkEnvironment;
-
-               private TaskManagerComponents(
-                       TaskManagerLocation taskManagerLocation,
-                       MemoryManager memoryManager,
-                       IOManager ioManager,
-                       NetworkEnvironment networkEnvironment) {
-
-                       this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
-                       this.memoryManager = 
Preconditions.checkNotNull(memoryManager);
-                       this.ioManager = Preconditions.checkNotNull(ioManager);
-                       this.networkEnvironment = 
Preconditions.checkNotNull(networkEnvironment);
-               }
-
-               public MemoryManager getMemoryManager() {
-                       return memoryManager;
-               }
-
-               public IOManager getIOManager() {
-                       return ioManager;
-               }
-
-               public NetworkEnvironment getNetworkEnvironment() {
-                       return networkEnvironment;
-               }
-
-               public TaskManagerLocation getTaskManagerLocation() {
-                       return taskManagerLocation;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index b6d9306..42655a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -170,6 +171,12 @@ public class LeaderRetrievalUtils {
        }
 
        public static InetAddress findConnectingAddress(
+               LeaderRetrievalService leaderRetrievalService,
+               Time timeout) throws LeaderRetrievalException {
+               return findConnectingAddress(leaderRetrievalService, new 
FiniteDuration(timeout.getSize(), timeout.getUnit()));
+       }
+
+       public static InetAddress findConnectingAddress(
                        LeaderRetrievalService leaderRetrievalService,
                        FiniteDuration timeout) throws LeaderRetrievalException 
{
                ConnectionUtils.LeaderConnectingAddressListener listener = new 
ConnectionUtils.LeaderConnectingAddressListener();

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index bd3af33..84f5ac7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -94,6 +94,10 @@ object AkkaUtils {
     createActorSystem(getDefaultAkkaConfig)
   }
 
+  def getAkkaConfig(configuration: Configuration, hostname: String, port: 
Int): Config = {
+    getAkkaConfig(configuration, if (hostname == null) Some((hostname, port)) 
else None)
+  }
+
   /**
    * Creates an akka config with the provided configuration values. If the 
listening address is
    * specified, then the actor system will listen on the respective address.

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 893eaa8..97aae34 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -30,6 +30,6 @@ case class NetworkEnvironmentConfiguration(
   queryServerPort: Int,
   queryServerNetworkThreads: Int,
   queryServerQueryThreads: Int,
-  nettyConfig: Option[NettyConfig] = None,
+  nettyConfig: NettyConfig = null,
   partitionRequestInitialBackoff: Int = 500,
   partitinRequestMaxBackoff: Int = 3000)

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index af2b38f..79670a4 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1932,7 +1932,7 @@ object TaskManager {
       netConfig.networkBufferSize,
       netConfig.memoryType)
 
-    val connectionManager = netConfig.nettyConfig match {
+    val connectionManager = Option(netConfig.nettyConfig) match {
       case Some(nettyConfig) => new NettyConnectionManager(nettyConfig)
       case None => new LocalConnectionManager()
     }
@@ -1942,7 +1942,7 @@ object TaskManager {
 
     val kvStateRegistry = new KvStateRegistry()
 
-    val kvStateServer = netConfig.nettyConfig match {
+    val kvStateServer = Option(netConfig.nettyConfig) match {
       case Some(nettyConfig) =>
 
         val numNetworkThreads = if (netConfig.queryServerNetworkThreads == 0) {
@@ -2274,7 +2274,7 @@ object TaskManager {
       queryServerPort,
       queryServerNetworkThreads,
       queryServerQueryThreads,
-      nettyConfig)
+      nettyConfig.getOrElse(null))
 
     // ----> timeouts, library caching, profiling
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index a9ad75d..cc50b66 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -38,7 +37,6 @@ import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
-import scala.Some;
 import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
@@ -75,7 +73,7 @@ public class NetworkEnvironmentTest {
                        0,
                        0,
                        0,
-                       Some.<NettyConfig>empty(),
+                       null,
                        0,
                        0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 5b8e6e6..2a004c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -30,7 +30,6 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
-import java.net.InetAddress;
 import java.util.BitSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 26218dd..9c1f288 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -26,8 +26,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
@@ -51,8 +52,8 @@ public class TaskExecutorTest extends TestLogger {
                try {
                        // register a mock resource manager gateway
                        ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
-                       TaskExecutorConfiguration taskExecutorConfiguration = 
mock(TaskExecutorConfiguration.class);
-                       
PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1);
+                       TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
+                       
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
                        rpc.registerGateway(resourceManagerAddress, rmGateway);
 
                        TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
@@ -61,12 +62,15 @@ public class TaskExecutorTest extends TestLogger {
                        NonHaServices haServices = new 
NonHaServices(resourceManagerAddress);
 
                        TaskExecutor taskManager = new TaskExecutor(
-                               taskExecutorConfiguration,
+                               taskManagerServicesConfiguration,
                                taskManagerLocation,
-                               rpc, mock(MemoryManager.class),
+                               rpc,
+                               mock(MemoryManager.class),
                                mock(IOManager.class),
                                mock(NetworkEnvironment.class),
-                               haServices);
+                               haServices,
+                               mock(MetricRegistry.class),
+                               mock(FatalErrorHandler.class));
 
                        taskManager.start();
                        String taskManagerAddress = taskManager.getAddress();
@@ -101,19 +105,22 @@ public class TaskExecutorTest extends TestLogger {
                        TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
                        
haServices.setResourceManagerLeaderRetriever(testLeaderService);
 
-                       TaskExecutorConfiguration taskExecutorConfiguration = 
mock(TaskExecutorConfiguration.class);
-                       
PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1);
+                       TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
+                       
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
 
                        TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
                        
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
 
                        TaskExecutor taskManager = new TaskExecutor(
-                               taskExecutorConfiguration,
+                               taskManagerServicesConfiguration,
                                taskManagerLocation,
-                               rpc, mock(MemoryManager.class),
+                               rpc,
+                               mock(MemoryManager.class),
                                mock(IOManager.class),
                                mock(NetworkEnvironment.class),
-                               haServices);
+                               haServices,
+                               mock(MetricRegistry.class),
+                               mock(FatalErrorHandler.class));
 
                        taskManager.start();
                        String taskManagerAddress = taskManager.getAddress();

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 1f93e9b..627a25a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -39,7 +39,6 @@ import 
org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -105,7 +104,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 
                        final NetworkEnvironmentConfiguration netConf = new 
NetworkEnvironmentConfiguration(
                                        32, BUFFER_SIZE, MemoryType.HEAP, 
IOManager.IOMode.SYNC, 0, 0, 0,
-                                       Option.<NettyConfig>empty(), 0, 0);
+                                       null, 0, 0);
 
                        ResourceID taskManagerId = ResourceID.generate();
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
index acfbbfd..c0d0455 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -29,7 +29,6 @@ import org.junit.Test;
 import scala.Tuple2;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.lang.reflect.Field;

Reply via email to