http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index d78a594..f974946 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.taskmanager
 
 import java.io.{File, IOException}
 import java.net.{InetAddress, InetSocketAddress}
+import java.util.UUID
 import java.util.concurrent.TimeUnit
 import java.lang.reflect.Method
 import java.lang.management.{OperatingSystemMXBean, ManagementFactory}
 
-import akka.actor._
-import akka.pattern.ask
-import akka.util.Timeout
+import _root_.akka.actor._
+import _root_.akka.pattern.ask
+import _root_.akka.util.Timeout
 
 import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
 import com.codahale.metrics.json.MetricsModule
@@ -36,9 +37,10 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration.{Configuration, ConfigConstants, 
GlobalConfiguration, IllegalConfigurationException}
+
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, 
TriggerCheckpoint, AbstractCheckpointMessage}
-import org.apache.flink.runtime.accumulators.{AccumulatorSnapshot, 
AccumulatorRegistry}
-import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, 
ActorLogMessages}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessages, 
LogMessages, StreamingMode}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -46,7 +48,8 @@ import 
org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, Ta
 import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, 
FallbackLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.filecache.FileCache
-import org.apache.flink.runtime.instance.{HardwareDescription, 
InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, 
HardwareDescription,
+InstanceConnectionInfo, InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.NetworkEnvironment
@@ -114,15 +117,20 @@ import scala.language.postfixOps
  *    - Exceptions releasing intermediate result resources. Critical resource 
leak,
  *      requires a clean JVM.
  */
-class TaskManager(protected val config: TaskManagerConfiguration,
-                  protected val connectionInfo: InstanceConnectionInfo,
-                  protected val jobManagerAkkaURL: String,
-                  protected val memoryManager: MemoryManager,
-                  protected val ioManager: IOManager,
-                  protected val network: NetworkEnvironment,
-                  protected val numberOfSlots: Int)
-
-extends Actor with ActorLogMessages with ActorSynchronousLogging {
+class TaskManager(
+    protected val config: TaskManagerConfiguration,
+    protected val connectionInfo: InstanceConnectionInfo,
+    protected val jobManagerAkkaURL: String,
+    protected val memoryManager: MemoryManager,
+    protected val ioManager: IOManager,
+    protected val network: NetworkEnvironment,
+    protected val numberOfSlots: Int)
+  extends FlinkActor
+  with LeaderSessionMessages // Mixin order is important: second we want to 
filter leader messages
+  with LogMessages // Mixin order is important: first we want to support 
message logging
+{
+
+  override val log = Logger(getClass)
 
   /** The timeout for all actor ask futures */
   protected val askTimeout = new Timeout(config.timeout)
@@ -161,6 +169,10 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 
   private var heartbeatScheduler: Option[Cancellable] = None
 
+  protected var leaderSessionID: Option[UUID] = None
+
+  private var currentRegistrationSessionID: UUID = UUID.randomUUID()
+
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -183,8 +195,15 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
     // kick off the registration
     val deadline: Option[Deadline] = 
config.maxRegistrationDuration.map(_.fromNow)
 
-    self.tell(TriggerTaskManagerRegistration(jobManagerAkkaURL,
-                   TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline,1), 
ActorRef.noSender)
+    self ! decorateMessage(
+      TriggerTaskManagerRegistration(
+        currentRegistrationSessionID,
+        jobManagerAkkaURL,
+        TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+        deadline,
+        1)
+    )
+
   }
 
   /**
@@ -236,8 +255,7 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
    * Central handling of actor messages. This method delegates to the more 
specialized
    * methods for handling certain classes of messages.
    */
-  override def receiveWithLogMessages: Receive = {
-
+  override def handleMessage: Receive = {
     // task messages are most common and critical, we handle them first
     case message: TaskMessage => handleTaskMessage(message)
 
@@ -259,7 +277,7 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
     // its registration at the JobManager
     case NotifyWhenRegisteredAtJobManager =>
       if (isConnected) {
-        sender ! RegisteredAtJobManager
+        sender ! decorateMessage(RegisteredAtJobManager)
       } else {
         waitForRegistration += sender
       }
@@ -339,12 +357,12 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
         // was into a terminal state, or in case the JobManager cannot be 
informed of the
         // state transition
 
-        case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =>
+      case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =>
 
           // we receive these from our tasks and forward them to the JobManager
           currentJobManager foreach {
             jobManager => {
-              val futureResponse = (jobManager ? updateMsg)(askTimeout)
+            val futureResponse = (jobManager ? 
decorateMessage(updateMsg))(askTimeout)
 
               val executionID = taskExecutionState.getID
 
@@ -353,13 +371,20 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
                 //            but only send messages to the TaskManager to do 
those changes
                 case Success(result) =>
                   if (!result) {
-                    self ! FailTask(executionID,
+                  self ! decorateMessage(
+                    FailTask(
+                      executionID,
                       new Exception("Task has been cancelled on the 
JobManager."))
+                  )
                   }
 
                 case Failure(t) =>
-                  self ! FailTask(executionID, new Exception(
-                    "Failed to send ExecutionStateChange notification to 
JobManager"))
+                self ! decorateMessage(
+                  FailTask(
+                    executionID,
+                    new Exception(
+                      "Failed to send ExecutionStateChange notification to 
JobManager"))
+                )
               }(context.dispatcher)
             }
           }
@@ -387,11 +412,15 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
           val task = runningTasks.get(executionID)
           if (task != null) {
             task.cancelExecution()
-            sender ! new TaskOperationResult(executionID, true)
+          sender ! decorateMessage(new TaskOperationResult(executionID, true))
           } else {
             log.debug(s"Cannot find task to cancel for execution 
${executionID})")
-            sender ! new TaskOperationResult(executionID, false,
+          sender ! decorateMessage(
+            new TaskOperationResult(
+              executionID,
+              false,
             "No task with that execution ID was found.")
+          )
           }
 
         case PartitionState(taskExecutionId, taskResultId, partitionId, state) 
=>
@@ -413,7 +442,6 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
   private def handleCheckpointingMessage(actorMessage: 
AbstractCheckpointMessage): Unit = {
 
     actorMessage match {
-
       case message: TriggerCheckpoint =>
         val taskExecutionId = message.getTaskExecutionId
         val checkpointId = message.getCheckpointId
@@ -457,118 +485,148 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
    * @param message The registration message.
    */
   private def handleRegistrationMessage(message: RegistrationMessage): Unit = {
+    if(message.registrationSessionID.equals(currentRegistrationSessionID)) {
+      message match {
+        case TriggerTaskManagerRegistration(
+          registrationSessionID,
+          jobManagerURL,
+          timeout,
+          deadline,
+          attempt) =>
+
+          if (isConnected) {
+            // this may be the case, if we queue another attempt and
+            // in the meantime, the registration is acknowledged
+            log.debug(
+              "TaskManager was triggered to register at JobManager, but is 
already registered")
+          }
+          else if (deadline.exists(_.isOverdue())) {
+            // we failed to register in time. that means we should quit
+            log.error("Failed to register at the JobManager withing the 
defined maximum " +
+              "connect time. Shutting down ...")
 
-    message match {
-
-      case TriggerTaskManagerRegistration(jobManagerURL, timeout, deadline, 
attempt) =>
-        if (isConnected) {
-          // this may be the case, if we queue another attempt and
-          // in the meantime, the registration is acknowledged
-          log.debug(
-            "TaskManager was triggered to register at JobManager, but is 
already registered")
-        }
-        else if (deadline.exists(_.isOverdue())) {
-          // we failed to register in time. that means we should quit
-          log.error("Failed to register at the JobManager withing the defined 
maximum " +
-            "connect time. Shutting down ...")
+            // terminate ourselves (hasta la vista)
+            self ! decorateMessage(PoisonPill)
+          }
+          else {
+            log.info(s"Trying to register at JobManager ${jobManagerURL} " +
+              s"(attempt ${attempt}, timeout: ${timeout})")
+
+            val jobManager = context.actorSelection(jobManagerAkkaURL)
+            jobManager ! decorateMessage(
+              RegisterTaskManager(
+                registrationSessionID,
+                self,
+                connectionInfo,
+                resources,
+                numberOfSlots)
+            )
+
+            // the next timeout computes via exponential backoff with cap
+            val nextTimeout = (timeout * 
2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
+
+            // schedule (with our timeout s delay) a check triggers a new 
registration
+            // attempt, if we are not registered by then
+            context.system.scheduler.scheduleOnce(timeout) {
+              if (!isConnected) {
+                self ! decorateMessage(
+                  TriggerTaskManagerRegistration(
+                    registrationSessionID,
+                    jobManagerURL,
+                    nextTimeout,
+                    deadline,
+                    attempt + 1)
+                )
+              }
+            }(context.dispatcher)
+          }
 
-          // terminate ourselves (hasta la vista)
-          self ! PoisonPill
-        }
-        else {
-          log.info(s"Trying to register at JobManager ${jobManagerURL} " +
-            s"(attempt ${attempt}, timeout: ${timeout})")
-
-          val jobManager = context.actorSelection(jobManagerAkkaURL)
-          jobManager ! RegisterTaskManager(self, connectionInfo, resources, 
numberOfSlots)
-
-          // the next timeout computes via exponential backoff with cap
-          val nextTimeout = (timeout * 
2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
-
-          // schedule (with our timeout s delay) a check triggers a new 
registration
-          // attempt, if we are not registered by then
-          context.system.scheduler.scheduleOnce(timeout) {
-            if (!isConnected) {
-              self.tell(TriggerTaskManagerRegistration(jobManagerURL,
-                                 nextTimeout, deadline, attempt + 1), 
ActorRef.noSender)
+        // successful registration. associate with the JobManager
+        // we disambiguate duplicate or erroneous messages, to simplify 
debugging
+        case AcknowledgeRegistration(_, leaderSessionID, jobManager, id, 
blobPort) =>
+          if (isConnected) {
+            if (jobManager == currentJobManager.orNull) {
+              log.debug("Ignoring duplicate registration acknowledgement.")
+            } else {
+              log.warn(s"Ignoring 'AcknowledgeRegistration' message from 
${jobManager.path} , " +
+                s"because the TaskManager is already registered at 
${currentJobManager.orNull}")
             }
-          }(context.dispatcher)
-        }
-
-      // successful registration. associate with the JobManager
-      // we disambiguate duplicate or erroneous messages, to simplify debugging
-      case AcknowledgeRegistration(jobManager, id, blobPort) =>
-        if (isConnected) {
-          if (jobManager == currentJobManager.orNull) {
-            log.debug("Ignoring duplicate registration acknowledgement.")
-          } else {
-            log.warn(s"Ignoring 'AcknowledgeRegistration' message from 
${jobManager.path} , " +
-              s"because the TaskManager is already registered at 
${currentJobManager.orNull}")
           }
-        }
-        else {
-          // not yet connected, so let's associate with that JobManager
-          try {
-            associateWithJobManager(jobManager, id, blobPort)
-          } catch {
-            case t: Throwable =>
-              killTaskManagerFatal(
-                "Unable to start TaskManager components after registering at 
JobManager", t)
+          else {
+            // not yet connected, so let's associate with that JobManager
+            try {
+              associateWithJobManager(jobManager, id, blobPort, 
leaderSessionID)
+            } catch {
+              case t: Throwable =>
+                killTaskManagerFatal(
+                  "Unable to start TaskManager components after registering at 
JobManager", t)
+            }
           }
-        }
 
-      // we are already registered at that specific JobManager - duplicate 
answer, rare cases
-      case AlreadyRegistered(jobManager, id, blobPort) =>
-        if (isConnected) {
-          if (jobManager == currentJobManager.orNull) {
-            log.debug("Ignoring duplicate registration acknowledgement.")
-          } else {
-            log.warn(s"Received 'AlreadyRegistered' message from JobManager 
${jobManager.path}, " +
-              s"even through TaskManager is currently registered at 
${currentJobManager.orNull}")
+        // we are already registered at that specific JobManager - duplicate 
answer, rare cases
+        case AlreadyRegistered(_, leaderSesssionID, jobManager, id, blobPort) 
=>
+          if (isConnected) {
+            if (jobManager == currentJobManager.orNull) {
+              log.debug("Ignoring duplicate registration acknowledgement.")
+            } else {
+              log.warn(s"Received 'AlreadyRegistered' message from " +
+                s"JobManager ${jobManager.path}, even through TaskManager is 
currently " +
+                s"registered at ${currentJobManager.orNull}")
+            }
           }
-        }
-        else {
-          // not connected, yet, to let's associate
-          log.info("Received 'AlreadyRegistered' message before 
'AcknowledgeRegistration'")
-
-          try {
-            associateWithJobManager(jobManager, id, blobPort)
-          } catch {
-            case t: Throwable =>
-              killTaskManagerFatal(
-                "Unable to start TaskManager components after registering at 
JobManager", t)
+          else {
+            // not connected, yet, to let's associate
+            log.info("Received 'AlreadyRegistered' message before 
'AcknowledgeRegistration'")
+
+            try {
+              associateWithJobManager(jobManager, id, blobPort, 
leaderSesssionID)
+            } catch {
+              case t: Throwable =>
+                killTaskManagerFatal(
+                  "Unable to start TaskManager components after registering at 
JobManager", t)
+            }
           }
-        }
 
-      case RefuseRegistration(reason) =>
-        if (currentJobManager.isEmpty) {
-          log.error(s"The registration at JobManager ${jobManagerAkkaURL} was 
refused, " +
-                    s"because: ${reason}. Retrying later...")
+        case RefuseRegistration(registrationSessionID, reason) =>
+          if (currentJobManager.isEmpty) {
+            log.error(s"The registration at JobManager ${jobManagerAkkaURL} 
was refused, " +
+              s"because: ${reason}. Retrying later...")
 
-          // try the registration again after some time
+            // try the registration again after some time
 
-          val delay: FiniteDuration = 
TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
-          val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
-            timeout => timeout + delay fromNow
-          }
+            val delay: FiniteDuration = 
TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
+            val deadline: Option[Deadline] = 
config.maxRegistrationDuration.map {
+              timeout => timeout + delay fromNow
+            }
 
-          context.system.scheduler.scheduleOnce(delay) {
-            self.tell(TriggerTaskManagerRegistration(jobManagerAkkaURL,
-                  TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline, 1), 
ActorRef.noSender)
-          }(context.dispatcher)
-        }
-        else {
-          // ignore RefuseRegistration messages which arrived after 
AcknowledgeRegistration
-          if (sender() == currentJobManager.orNull) {
-            log.warn(s"Received 'RefuseRegistration' from the JobManager 
(${sender().path})" +
-                     s" even though this TaskManager is already registered 
there.")
+            context.system.scheduler.scheduleOnce(delay) {
+              self ! decorateMessage(
+                TriggerTaskManagerRegistration(
+                  registrationSessionID,
+                  jobManagerAkkaURL,
+                  TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+                  deadline,
+                  1)
+              )
+            }(context.dispatcher)
           }
           else {
-            log.warn(s"Ignoring 'RefuseRegistration' from unrelated JobManager 
(${sender().path})")
+            // ignore RefuseRegistration messages which arrived after 
AcknowledgeRegistration
+            if (sender() == currentJobManager.orNull) {
+              log.warn(s"Received 'RefuseRegistration' from the JobManager 
(${sender().path})" +
+                s" even though this TaskManager is already registered there.")
+            }
+            else {
+              log.warn(s"Ignoring 'RefuseRegistration' from unrelated " +
+                s"JobManager (${sender().path})")
+            }
           }
-        }
 
-      case _ => unhandled(message)
+        case _ => unhandled(message)
+      }
+    } else {
+      log.debug(s"Discarded registration message ${message}, because the 
registration session " +
+        "ID was not correct.")
     }
   }
 
@@ -593,20 +651,28 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
    * @param id The instanceID under which the TaskManager is registered
    *           at the JobManager.
    * @param blobPort The JobManager's port for the BLOB server.
+   * @param newLeaderSessionID Leader session ID of the JobManager
    */
-  private def associateWithJobManager(jobManager: ActorRef,
-                                      id: InstanceID,
-                                      blobPort: Int): Unit = {
+  private def associateWithJobManager(
+      jobManager: ActorRef,
+      id: InstanceID,
+      blobPort: Int,
+      newLeaderSessionID: UUID)
+    : Unit = {
 
     if (jobManager == null) {
-      throw new NullPointerException("jobManager may not be null")
+      throw new NullPointerException("jobManager must not be null.")
     }
     if (id == null) {
-      throw new NullPointerException("instance ID may not be null")
+      throw new NullPointerException("instance ID must not be null.")
     }
     if (blobPort <= 0 || blobPort > 65535) {
       throw new IllegalArgumentException("blob port is out of range: " + 
blobPort)
     }
+    if(newLeaderSessionID == null) {
+      throw new NullPointerException("Leader session ID must not be null.")
+    }
+
 
     // sanity check that we are not currently registered with a different 
JobManager
     if (isConnected) {
@@ -631,9 +697,18 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
       throw new IllegalStateException("JobManager-specific components are 
already initialized.")
     }
 
+    currentJobManager = Some(jobManager)
+    instanceID = id
+    leaderSessionID = Some(newLeaderSessionID)
+
     // start the network stack, now that we have the JobManager actor reference
     try {
-      network.associateWithTaskManagerAndJobManager(jobManager, self)
+      network.associateWithTaskManagerAndJobManager(
+        new AkkaActorGateway(jobManager, leaderSessionID),
+        new AkkaActorGateway(self, leaderSessionID)
+      )
+
+
     }
     catch {
       case e: Exception =>
@@ -665,16 +740,18 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
       libraryCacheManager = Some(new FallbackLibraryCacheManager)
     }
 
-    currentJobManager = Some(jobManager)
-    instanceID = id
-
     // watch job manager to detect when it dies
     context.watch(jobManager)
 
     // schedule regular heartbeat message for oneself
-    heartbeatScheduler = Some(context.system.scheduler.schedule(
-      TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, 
SendHeartbeat)
-       (context.dispatcher))
+    heartbeatScheduler = Some(
+      context.system.scheduler.schedule(
+        TaskManager.HEARTBEAT_INTERVAL,
+        TaskManager.HEARTBEAT_INTERVAL,
+        self,
+        decorateMessage(SendHeartbeat)
+      )(context.dispatcher)
+    )
 
     // notify all the actors that listen for a successful registration
     for (listener <- waitForRegistration) {
@@ -710,7 +787,7 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 
     // de-register from the JobManager (faster detection of disconnect)
     currentJobManager foreach {
-      _ ! Disconnect(s"TaskManager ${self.path} is shutting down.")
+      _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is shutting 
down."))
     }
 
     currentJobManager = None
@@ -748,8 +825,14 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 
           // begin attempts to reconnect
           val deadline: Option[Deadline] = 
config.maxRegistrationDuration.map(_.fromNow)
-          self ! TriggerTaskManagerRegistration(jobManagerAkkaURL,
-                               TaskManager.INITIAL_REGISTRATION_TIMEOUT, 
deadline, 1)
+          self ! decorateMessage(
+            TriggerTaskManagerRegistration(
+              currentRegistrationSessionID,
+              jobManagerAkkaURL,
+              TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+              deadline,
+              1)
+          )
         }
         catch {
           // this is pretty bad, it leaves the TaskManager in a state where it 
cannot
@@ -795,8 +878,21 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 
       // create the task. this does not grab any TaskManager resources or 
download
       // and libraries - the operation does not block
-      val task = new Task(tdd, memoryManager, ioManager, network, bcVarManager,
-                          self, jobManagerActor, config.timeout, libCache, 
fileCache)
+
+      val jobManagerGateway = new AkkaActorGateway(jobManagerActor, 
leaderSessionID)
+      val selfGateway = new AkkaActorGateway(self, leaderSessionID)
+
+      val task = new Task(
+        tdd,
+        memoryManager,
+        ioManager,
+        network,
+        bcVarManager,
+        selfGateway,
+        jobManagerGateway,
+        config.timeout,
+        libCache,
+        fileCache)
 
       log.info(s"Received task ${task.getTaskNameWithSubtasks}")
 
@@ -812,12 +908,12 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
       // all good, we kick off the task, which performs its own initialization
       task.startTaskThread()
       
-      sender ! Acknowledge
+      sender ! decorateMessage(Acknowledge)
     }
     catch {
       case t: Throwable => 
         log.error("SubmitTask failed", t)
-        sender ! Failure(t)
+        sender ! decorateMessage(Failure(t))
     }
   }
 
@@ -828,8 +924,9 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
    * @param partitionInfos The descriptor of the intermediate result 
partitions.
    */
   private def updateTaskInputPartitions(
-         executionId: ExecutionAttemptID,
-         partitionInfos: Seq[(IntermediateDataSetID, 
InputChannelDeploymentDescriptor)]) : Unit = {
+       executionId: ExecutionAttemptID,
+       partitionInfos: Seq[(IntermediateDataSetID, 
InputChannelDeploymentDescriptor)])
+    : Unit = {
 
     Option(runningTasks.get(executionId)) match {
       case Some(task) =>
@@ -867,15 +964,15 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
         }
 
         if (errors.isEmpty) {
-          sender ! Acknowledge
+          sender ! decorateMessage(Acknowledge)
         } else {
-          sender ! Failure(new Exception(errors.mkString("\n")))
+          sender ! decorateMessage(Failure(new 
Exception(errors.mkString("\n"))))
         }
 
       case None =>
         log.debug(s"Discard update for input partitions of task $executionId : 
" +
           s"task is no longer running.")
-        sender ! Acknowledge
+        sender ! decorateMessage(Acknowledge)
     }
   }
 
@@ -919,9 +1016,16 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
         registry.getSnapshot
       }
 
-      self ! UpdateTaskExecutionState(new TaskExecutionState(
-        task.getJobID, task.getExecutionId, task.getExecutionState, 
task.getFailureCause,
-        accumulators))
+        self ! decorateMessage(
+          UpdateTaskExecutionState(
+            new TaskExecutionState(
+              task.getJobID,
+              task.getExecutionId,
+              task.getExecutionState,
+              task.getFailureCause,
+              accumulators)
+          )
+        )
     }
     else {
       log.error(s"Cannot find task with ID $executionID to unregister.")
@@ -952,7 +1056,7 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
       }
 
        currentJobManager foreach {
-        jm => jm ! Heartbeat(instanceID, metricsReport, accumulatorEvents)
+        jm => jm ! decorateMessage(Heartbeat(instanceID, metricsReport, 
accumulatorEvents))
       }
     }
     catch {
@@ -972,14 +1076,12 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 
     try {
       val traces = Thread.getAllStackTraces.asScala
-      val stackTraceStr = traces.map(
-        (trace: (Thread, Array[StackTraceElement])) => {
-          val (thread, elements) = trace
+      val stackTraceStr = traces.map {
+        case (thread: Thread, elements: Array[StackTraceElement]) =>
           "Thread: " + thread.getName + '\n' + elements.mkString("\n")
-          })
-        .mkString("\n\n")
+        }.mkString("\n\n")
 
-      recipient ! StackTrace(instanceID, stackTraceStr)
+      recipient ! decorateMessage(StackTrace(instanceID, stackTraceStr))
     }
     catch {
       case e: Exception => log.error("Failed to send stack trace to " + 
recipient.path, e)
@@ -999,7 +1101,7 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
       "\n" +
       "A fatal error occurred, forcing the TaskManager to shut down: " + 
message, cause)
 
-    self ! Kill
+    self ! decorateMessage(Kill)
   }
 }
 
@@ -1165,24 +1267,34 @@ object TaskManager {
    *                         Allows to use TaskManager subclasses for example 
for YARN.
    */
   @throws(classOf[Exception])
-  def selectNetworkInterfaceAndRunTaskManager(configuration: Configuration,
-                                              streamingMode: StreamingMode,
-                                              taskManagerClass: Class[_ <: 
TaskManager]) : Unit = {
+  def selectNetworkInterfaceAndRunTaskManager(
+      configuration: Configuration,
+      streamingMode: StreamingMode,
+      taskManagerClass: Class[_ <: TaskManager])
+    : Unit = {
 
     val (jobManagerHostname, jobManagerPort) = 
getAndCheckJobManagerAddress(configuration)
 
-    val (taskManagerHostname, actorSystemPort) =
-       selectNetworkInterfaceAndPort(configuration, jobManagerHostname, 
jobManagerPort)
+    val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(
+      configuration,
+      jobManagerHostname,
+      jobManagerPort)
 
-    runTaskManager(taskManagerHostname, actorSystemPort, configuration,
-                   streamingMode, taskManagerClass)
+    runTaskManager(
+      taskManagerHostname,
+      actorSystemPort,
+      configuration,
+      streamingMode,
+      taskManagerClass)
   }
 
   @throws(classOf[IOException])
   @throws(classOf[IllegalConfigurationException])
-  def selectNetworkInterfaceAndPort(configuration: Configuration,
-                                    jobManagerHostname: String,
-                                    jobManagerPort: Int) : (String, Int) = {
+  def selectNetworkInterfaceAndPort(
+      configuration: Configuration,
+      jobManagerHostname: String,
+      jobManagerPort: Int)
+    : (String, Int) = {
 
     var taskManagerHostname = configuration.getString(
       ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null)
@@ -1243,13 +1355,19 @@ object TaskManager {
    * @param configuration The configuration for the TaskManager.
    */
   @throws(classOf[Exception])
-  def runTaskManager(taskManagerHostname: String,
-                     actorSystemPort: Int, 
-                     configuration: Configuration,
-                     streamingMode: StreamingMode) : Unit = {
+  def runTaskManager(
+      taskManagerHostname: String,
+      actorSystemPort: Int,
+      configuration: Configuration,
+      streamingMode: StreamingMode)
+    : Unit = {
 
-    runTaskManager(taskManagerHostname, actorSystemPort, configuration,
-                   streamingMode, classOf[TaskManager])
+    runTaskManager(
+      taskManagerHostname,
+      actorSystemPort,
+      configuration,
+      streamingMode,
+      classOf[TaskManager])
   }
 
   /**
@@ -1269,11 +1387,13 @@ object TaskManager {
    *                         subclasses for example for YARN.
    */
   @throws(classOf[Exception])
-  def runTaskManager(taskManagerHostname: String,
-                     actorSystemPort: Int,
-                     configuration: Configuration,
-                     streamingMode: StreamingMode,
-                     taskManagerClass: Class[_ <: TaskManager]) : Unit = {
+  def runTaskManager(
+      taskManagerHostname: String,
+      actorSystemPort: Int,
+      configuration: Configuration,
+      streamingMode: StreamingMode,
+      taskManagerClass: Class[_ <: TaskManager])
+    : Unit = {
 
     LOG.info(s"Starting TaskManager in streaming mode $streamingMode")
 
@@ -1282,8 +1402,10 @@ object TaskManager {
     LOG.info(s"Starting TaskManager actor system at 
$taskManagerHostname:$actorSystemPort")
 
     val taskManagerSystem = try {
-      val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
-                                               Some((taskManagerHostname, 
actorSystemPort)))
+      val akkaConfig = AkkaUtils.getAkkaConfig(
+        configuration,
+        Some((taskManagerHostname, actorSystemPort))
+      )
       if (LOG.isDebugEnabled) {
         LOG.debug("Using akka configuration\n " + akkaConfig)
       }
@@ -1307,13 +1429,14 @@ object TaskManager {
     // and the TaskManager actor
     try {
       LOG.info("Starting TaskManager actor")
-      val taskManager = startTaskManagerComponentsAndActor(configuration,
-                                                           taskManagerSystem,
-                                                           taskManagerHostname,
-                                                           
Some(TASK_MANAGER_NAME),
-                                                           None, false,
-                                                           streamingMode,
-                                                           taskManagerClass)
+      val taskManager = startTaskManagerComponentsAndActor(
+        configuration,
+        taskManagerSystem,
+        taskManagerHostname,
+        Some(TASK_MANAGER_NAME),
+        None, false,
+        streamingMode,
+        taskManagerClass)
 
       // start a process reaper that watches the JobManager. If the 
TaskManager actor dies,
       // the process reaper will kill the JVM process (to ensure easy failure 
detection)
@@ -1433,14 +1556,15 @@ object TaskManager {
       configuredMemory << 20 // megabytes to bytes
     }
     else {
-      val fraction = 
configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-                                            
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+      val fraction = configuration.getFloat(
+        ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
       checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
                            ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
                            "MemoryManager fraction of the free memory must be 
between 0.0 and 1.0")
 
       val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-                                                                               
    fraction).toLong
+        fraction).toLong
 
       LOG.info(s"Using $fraction of the currently free heap space for Flink 
managed " +
         s"memory (${relativeMemSize >> 20} MB).")
@@ -1452,10 +1576,11 @@ object TaskManager {
 
     // now start the memory manager
     val memoryManager = try {
-      new DefaultMemoryManager(memorySize,
-                               taskManagerConfig.numberOfSlots,
-                               netConfig.networkBufferSize,
-                               preAllocateMemory)
+      new DefaultMemoryManager(
+        memorySize,
+        taskManagerConfig.numberOfSlots,
+        netConfig.networkBufferSize,
+        preAllocateMemory)
     }
     catch {
       case e: OutOfMemoryError => throw new Exception(
@@ -1472,7 +1597,8 @@ object TaskManager {
     }
 
     // create the actor properties (which define the actor constructor 
parameters)
-    val tmProps = Props(taskManagerClass,
+    val tmProps = Props(
+      taskManagerClass,
       taskManagerConfig,
       connectionInfo,
       jobManagerAkkaUrl,
@@ -1502,9 +1628,11 @@ object TaskManager {
    * @return The ActorRef to the TaskManager
    */
   @throws(classOf[IOException])
-  def getTaskManagerRemoteReference(taskManagerUrl: String,
-                                    system: ActorSystem,
-                                    timeout: FiniteDuration): ActorRef = {
+  def getTaskManagerRemoteReference(
+      taskManagerUrl: String,
+      system: ActorSystem,
+      timeout: FiniteDuration)
+    : ActorRef = {
     try {
       val future = AkkaUtils.getReference(taskManagerUrl, system, timeout)
       Await.result(future, timeout)
@@ -1536,10 +1664,11 @@ object TaskManager {
    *                  InstanceConnectionInfo, JobManager actor Akka URL).
    */
   @throws(classOf[IllegalArgumentException])
-  def parseTaskManagerConfiguration(configuration: Configuration,
-                                    taskManagerHostname: String,
-                                    localTaskManagerCommunication: Boolean):
-    (TaskManagerConfiguration,
+  def parseTaskManagerConfiguration(
+      configuration: Configuration,
+      taskManagerHostname: String,
+      localTaskManagerCommunication: Boolean)
+    : (TaskManagerConfiguration,
      NetworkEnvironmentConfiguration,
      InstanceConnectionInfo) = {
 
@@ -1579,10 +1708,10 @@ object TaskManager {
       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
     
     val pageSizeNew: Int = configuration.getInteger(
-                                        
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
+      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
     
     val pageSizeOld: Int = configuration.getInteger(
-                                        
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
+      ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
 
     val pageSize: Int =
       if (pageSizeNew != -1) {
@@ -1617,7 +1746,7 @@ object TaskManager {
     val tmpDirs = configuration.getString(
       ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)
-      .split(",|" + File.pathSeparator)
+    .split(",|" + File.pathSeparator)
 
     val nettyConfig = if (localTaskManagerCommunication) {
       None
@@ -1633,7 +1762,10 @@ object TaskManager {
     val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else 
IOMode.SYNC
 
     val networkConfig = NetworkEnvironmentConfiguration(
-      numNetworkBuffers, pageSize, ioMode, nettyConfig)
+      numNetworkBuffers,
+      pageSize,
+      ioMode,
+      nettyConfig)
 
     // ----> timeouts, library caching, profiling
 
@@ -1667,8 +1799,12 @@ object TaskManager {
         e)
     }
 
-    val taskManagerConfig = TaskManagerConfiguration(tmpDirs, cleanupInterval, 
timeout,
-      finiteRegistratioDuration, slots,
+    val taskManagerConfig = TaskManagerConfiguration(
+      tmpDirs,
+      cleanupInterval,
+      timeout,
+      finiteRegistratioDuration,
+      slots,
       configuration)
 
     (taskManagerConfig, networkConfig, connectionInfo)
@@ -1720,10 +1856,12 @@ object TaskManager {
    * @throws IllegalConfigurationException Thrown if the condition is violated.
    */
   @throws(classOf[IllegalConfigurationException])
-  private def checkConfigParameter(condition: Boolean,
-                                   parameter: Any,
-                                   name: String,
-                                   errorMessage: String = ""): Unit = {
+  private def checkConfigParameter(
+      condition: Boolean,
+      parameter: Any,
+      name: String,
+      errorMessage: String = "")
+    : Unit = {
     if (!condition) {
       throw new IllegalConfigurationException(
         s"Invalid configuration value for '${name}' : ${parameter} - 
${errorMessage}")

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
new file mode 100644
index 0000000..324b014
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FlinkUntypedActorTest {
+
+       private static ActorSystem actorSystem;
+
+       @BeforeClass
+       public static void setup() {
+               actorSystem = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(actorSystem);
+       }
+
+       @Test
+       public void testLeaderSessionMessageFilteringOfFlinkUntypedActor() {
+               final Option<UUID> leaderSessionID = 
Option.apply(UUID.randomUUID());
+               final Option<UUID> oldSessionID = 
Option.apply(UUID.randomUUID());
+
+               TestActorRef<PlainFlinkUntypedActor> actor = null;
+
+               try {
+                       actor = TestActorRef.create(
+                                       actorSystem, 
Props.create(PlainFlinkUntypedActor.class, leaderSessionID));
+
+                       final PlainFlinkUntypedActor underlyingActor = 
actor.underlyingActor();
+
+                       actor.tell(new 
JobManagerMessages.LeaderSessionMessage(leaderSessionID, 1), 
ActorRef.noSender());
+                       actor.tell(new 
JobManagerMessages.LeaderSessionMessage(oldSessionID, 2), ActorRef.noSender());
+                       actor.tell(new 
JobManagerMessages.LeaderSessionMessage(leaderSessionID, 2), 
ActorRef.noSender());
+                       actor.tell(1, ActorRef.noSender());
+
+                       assertEquals(3, underlyingActor.getMessageCounter());
+
+               } finally {
+                       stopActor(actor);
+               }
+       }
+
+       @Test
+       public void 
testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDMessage() {
+               final Option<UUID> leaderSessionID = 
Option.apply(UUID.randomUUID());
+
+               TestActorRef<PlainFlinkUntypedActor> actor = null;
+
+               try{
+                       final Props props = 
Props.create(PlainFlinkUntypedActor.class, leaderSessionID);
+                       actor = TestActorRef.create(actorSystem, props);
+
+                       actor.receive(new 
JobManagerMessages.LeaderSessionMessage(leaderSessionID, 1));
+
+                       try {
+                               actor.receive(new 
PlainRequiresLeaderSessionID());
+
+                               fail("Expected an exception to be thrown, 
because a RequiresLeaderSessionID" +
+                                               "message was sent without being 
wrapped in LeaderSessionMessage.");
+                       } catch (Exception e) {
+                               assertEquals("Received a message 
PlainRequiresLeaderSessionID " +
+                                               "without a leader session ID, 
even though it requires to have one.",
+                                               e.getMessage());
+                       }
+
+               } finally {
+                       stopActor(actor);
+               }
+       }
+
+       private static void stopActor(ActorRef actor) {
+               if(actor != null) {
+                       actor.tell(Kill.getInstance(), ActorRef.noSender());
+               }
+       }
+
+
+       static class PlainFlinkUntypedActor extends FlinkUntypedActor {
+
+               private Option<UUID> leaderSessionID;
+
+               private int messageCounter;
+
+               public PlainFlinkUntypedActor(Option<UUID> leaderSessionID) {
+                       this.leaderSessionID = leaderSessionID;
+                       this.messageCounter = 0;
+               }
+
+               @Override
+               protected void handleMessage(Object message) throws Exception {
+                       messageCounter++;
+               }
+
+               @Override
+               protected Option<UUID> getLeaderSessionID() {
+                       return leaderSessionID;
+               }
+
+               public int getMessageCounter() {
+                       return messageCounter;
+               }
+       }
+
+       static class PlainRequiresLeaderSessionID implements 
RequiresLeaderSessionID {
+               @Override
+               public String toString() {
+                       return "PlainRequiresLeaderSessionID";
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 932e366..b124304 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -62,18 +60,19 @@ public class CoordinatorShutdownTest {
                        JobGraph testGraph = new JobGraph("test job", vertex);
                        testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
                        
-                       ActorRef jobManager = cluster.getJobManager();
+                       ActorGateway jobManager = 
cluster.getJobManagerGateway();
 
                        FiniteDuration timeout = new FiniteDuration(60, 
TimeUnit.SECONDS);
                        JobManagerMessages.SubmitJob submitMessage = new 
JobManagerMessages.SubmitJob(testGraph, false);
                        
                        // submit is successful, but then the job dies because 
no TaskManager / slot is available
-                       Future<Object> submitFuture = Patterns.ask(jobManager, 
submitMessage, timeout.toMillis());
+                       Future<Object> submitFuture = 
jobManager.ask(submitMessage, timeout);
                        Await.result(submitFuture, timeout);
 
                        // get the execution graph and make sure the 
coordinator is properly shut down
-                       Future<Object> jobRequestFuture = 
Patterns.ask(jobManager,
-                                       new 
JobManagerMessages.RequestJob(testGraph.getJobID()), timeout.toMillis());
+                       Future<Object> jobRequestFuture = jobManager.ask(
+                                       new 
JobManagerMessages.RequestJob(testGraph.getJobID()),
+                                       timeout);
                        
                        ExecutionGraph graph = ((JobManagerMessages.JobFound) 
Await.result(jobRequestFuture, timeout)).executionGraph();
                        
@@ -109,18 +108,19 @@ public class CoordinatorShutdownTest {
                        JobGraph testGraph = new JobGraph("test job", vertex);
                        testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
                        
-                       ActorRef jobManager = cluster.getJobManager();
+                       ActorGateway jobManager = 
cluster.getJobManagerGateway();
 
                        FiniteDuration timeout = new FiniteDuration(60, 
TimeUnit.SECONDS);
                        JobManagerMessages.SubmitJob submitMessage = new 
JobManagerMessages.SubmitJob(testGraph, false);
 
                        // submit is successful, but then the job dies because 
no TaskManager / slot is available
-                       Future<Object> submitFuture = Patterns.ask(jobManager, 
submitMessage, timeout.toMillis());
+                       Future<Object> submitFuture = 
jobManager.ask(submitMessage, timeout);
                        Await.result(submitFuture, timeout);
 
                        // get the execution graph and make sure the 
coordinator is properly shut down
-                       Future<Object> jobRequestFuture = 
Patterns.ask(jobManager,
-                                       new 
JobManagerMessages.RequestJob(testGraph.getJobID()), timeout.toMillis());
+                       Future<Object> jobRequestFuture = jobManager.ask(
+                                       new 
JobManagerMessages.RequestJob(testGraph.getJobID()),
+                                       timeout);
 
                        ExecutionGraph graph = ((JobManagerMessages.JobFound) 
Await.result(jobRequestFuture, timeout)).executionGraph();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index cff7146..e3fc852 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -92,7 +92,7 @@ public class ExecutionGraphDeploymentTest {
                        ExecutionJobVertex ejv = eg.getAllVertices().get(jid2);
                        ExecutionVertex vertex = ejv.getTaskVertices()[3];
 
-                       ExecutionGraphTestUtils.SimpleInstanceGateway 
instanceGateway = new 
ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.directExecutionContext());
+                       ExecutionGraphTestUtils.SimpleActorGateway 
instanceGateway = new 
ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext());
 
                        final Instance instance = getInstance(instanceGateway);
 
@@ -295,7 +295,7 @@ public class ExecutionGraphDeploymentTest {
                for (int i = 0; i < dop1 + dop2; i++) {
                        scheduler.newInstanceAvailable(
                                        ExecutionGraphTestUtils.getInstance(
-                                                       new 
ExecutionGraphTestUtils.SimpleInstanceGateway(
+                                                       new 
ExecutionGraphTestUtils.SimpleActorGateway(
                                                                        
TestingUtils.directExecutionContext())));
                }
                assertEquals(dop1 + dop2, 
scheduler.getNumberOfAvailableSlots());

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 8a63060..64d4c44 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -24,18 +24,17 @@ import static org.mockito.Mockito.spy;
 
 import java.lang.reflect.Field;
 import java.net.InetAddress;
-import java.util.LinkedList;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.instance.InstanceGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -101,11 +100,11 @@ public class ExecutionGraphTestUtils {
        //  utility mocking methods
        // 
--------------------------------------------------------------------------------------------
 
-       public static Instance getInstance(final InstanceGateway gateway) 
throws Exception {
+       public static Instance getInstance(final ActorGateway gateway) throws 
Exception {
                return getInstance(gateway, 1);
        }
 
-       public static Instance getInstance(final InstanceGateway gateway, final 
int numberOfSlots) throws Exception {
+       public static Instance getInstance(final ActorGateway gateway, final 
int numberOfSlots) throws Exception {
                HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
                InetAddress address = InetAddress.getByName("127.0.0.1");
                InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10001);
@@ -113,10 +112,10 @@ public class ExecutionGraphTestUtils {
                return new Instance(gateway, connection, new InstanceID(), 
hardwareDescription, numberOfSlots);
        }
 
-       public static class SimpleInstanceGateway extends 
BaseTestingInstanceGateway {
+       public static class SimpleActorGateway extends BaseTestingActorGateway {
                public TaskDeploymentDescriptor lastTDD;
 
-               public SimpleInstanceGateway(ExecutionContext executionContext){
+               public SimpleActorGateway(ExecutionContext executionContext){
                        super(executionContext);
                }
 
@@ -140,8 +139,8 @@ public class ExecutionGraphTestUtils {
                }
        }
 
-       public static class SimpleFailingInstanceGateway extends 
BaseTestingInstanceGateway {
-               public SimpleFailingInstanceGateway(ExecutionContext 
executionContext) {
+       public static class SimpleFailingActorGateway extends 
BaseTestingActorGateway {
+               public SimpleFailingActorGateway(ExecutionContext 
executionContext) {
                        super(executionContext);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index f47e92c..89b82f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -63,7 +63,7 @@ public class ExecutionStateProgressTest {
                        // mock resources and mock taskmanager
                        for (ExecutionVertex ee : ejv.getTaskVertices()) {
                                SimpleSlot slot = getInstance(
-                                               new SimpleInstanceGateway(
+                                               new SimpleActorGateway(
                                                                
TestingUtils.defaultExecutionContext())
                                ).allocateSimpleSlot(jid);
                                ee.deployToSlot(slot);

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 9db330b..e9b67af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -26,9 +26,9 @@ import java.io.IOException;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
-import org.apache.flink.runtime.instance.InstanceGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.api.common.JobID;
@@ -125,12 +125,12 @@ public class ExecutionVertexCancelTest {
                        setVertexState(vertex, ExecutionState.SCHEDULED);
                        assertEquals(ExecutionState.SCHEDULED, 
vertex.getExecutionState());
 
-                       InstanceGateway instanceGateway = new 
CancelSequenceInstanceGateway(
+                       ActorGateway actorGateway = new 
CancelSequenceActorGateway(
                                        executionContext,
                                        new TaskOperationResult(execId, true),
                                        new TaskOperationResult(execId, false));
 
-                       Instance instance = getInstance(instanceGateway);
+                       Instance instance = getInstance(actorGateway);
                        SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        vertex.deployToSlot(slot);
@@ -195,13 +195,13 @@ public class ExecutionVertexCancelTest {
 
                        // task manager cancel sequence mock actor
                        // first return NOT SUCCESS (task not found, cancel 
call overtook deploy call), then success (cancel call after deploy call)
-                       InstanceGateway instanceGateway = new 
CancelSequenceInstanceGateway(
+                       ActorGateway actorGateway = new 
CancelSequenceActorGateway(
                                        executionContext,
                                        new     TaskOperationResult(execId, 
false),
                                        new TaskOperationResult(execId, true)
                        );
 
-                       Instance instance = getInstance(instanceGateway);
+                       Instance instance = getInstance(actorGateway);
                        SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        vertex.deployToSlot(slot);
@@ -258,11 +258,11 @@ public class ExecutionVertexCancelTest {
                                        AkkaUtils.getDefaultTimeout());
                        final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
-                       InstanceGateway instanceGateway = new 
CancelSequenceInstanceGateway(
+                       ActorGateway actorGateway = new 
CancelSequenceActorGateway(
                                        TestingUtils.directExecutionContext(),
                                        new TaskOperationResult(execId, true));
 
-                       Instance instance = getInstance(instanceGateway);
+                       Instance instance = getInstance(actorGateway);
                        SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        setVertexState(vertex, ExecutionState.RUNNING);
@@ -299,12 +299,12 @@ public class ExecutionVertexCancelTest {
                                        AkkaUtils.getDefaultTimeout());
                        final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
-                       final InstanceGateway instanceGateway = new 
CancelSequenceInstanceGateway(
+                       final ActorGateway actorGateway = new 
CancelSequenceActorGateway(
                                        TestingUtils.directExecutionContext(),
                                        new TaskOperationResult(execId, true)
                        );
 
-                       Instance instance = getInstance(instanceGateway);
+                       Instance instance = getInstance(actorGateway);
                        SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        setVertexState(vertex, ExecutionState.RUNNING);
@@ -350,12 +350,12 @@ public class ExecutionVertexCancelTest {
                        final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
 
-                       final InstanceGateway instanceGateway = new 
CancelSequenceInstanceGateway(
+                       final ActorGateway actorGateway = new 
CancelSequenceActorGateway(
                                        TestingUtils.directExecutionContext(),
                                        new TaskOperationResult(execId, false)
                        );
 
-                       Instance instance = getInstance(instanceGateway);
+                       Instance instance = getInstance(actorGateway);
                        SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
                        setVertexState(vertex, ExecutionState.RUNNING);
@@ -386,7 +386,7 @@ public class ExecutionVertexCancelTest {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       final InstanceGateway gateway = new 
CancelSequenceInstanceGateway(TestingUtils.directExecutionContext());
+                       final ActorGateway gateway = new 
CancelSequenceActorGateway(TestingUtils.directExecutionContext());
 
                        Instance instance = getInstance(gateway);
                        SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
@@ -423,7 +423,7 @@ public class ExecutionVertexCancelTest {
                                        AkkaUtils.getDefaultTimeout());
                        final ExecutionAttemptID execID = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
-                       final InstanceGateway gateway = new 
CancelSequenceInstanceGateway(
+                       final ActorGateway gateway = new 
CancelSequenceActorGateway(
                                        TestingUtils.defaultExecutionContext(),
                                        new TaskOperationResult(execID, true));
 
@@ -482,7 +482,7 @@ public class ExecutionVertexCancelTest {
                        // deploying after canceling from CREATED needs to 
raise an exception, because
                        // the scheduler (or any caller) needs to know that the 
slot should be released
                        try {
-                               Instance instance = 
getInstance(DummyInstanceGateway.INSTANCE);
+                               Instance instance = 
getInstance(DummyActorGateway.INSTANCE);
                                SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
 
                                vertex.deployToSlot(slot);
@@ -525,7 +525,7 @@ public class ExecutionVertexCancelTest {
                                                AkkaUtils.getDefaultTimeout());
                                setVertexState(vertex, 
ExecutionState.CANCELING);
 
-                               Instance instance = 
getInstance(DummyInstanceGateway.INSTANCE);
+                               Instance instance = 
getInstance(DummyActorGateway.INSTANCE);
                                SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
 
                                vertex.deployToSlot(slot);
@@ -541,7 +541,7 @@ public class ExecutionVertexCancelTest {
                                ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
                                                AkkaUtils.getDefaultTimeout());
 
-                               Instance instance = 
getInstance(DummyInstanceGateway.INSTANCE);
+                               Instance instance = 
getInstance(DummyActorGateway.INSTANCE);
                                SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
 
                                setVertexResource(vertex, slot);
@@ -562,11 +562,11 @@ public class ExecutionVertexCancelTest {
                }
        }
 
-       public static class CancelSequenceInstanceGateway extends 
BaseTestingInstanceGateway {
+       public static class CancelSequenceActorGateway extends 
BaseTestingActorGateway {
                private final TaskOperationResult[] results;
                private int index = -1;
 
-               public CancelSequenceInstanceGateway(ExecutionContext 
executionContext, TaskOperationResult ... result) {
+               public CancelSequenceActorGateway(ExecutionContext 
executionContext, TaskOperationResult... result) {
                        super(executionContext);
                        this.results = result;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 431c3a9..81ec6c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -43,7 +43,7 @@ public class ExecutionVertexDeploymentTest {
 
                        // mock taskmanager to simply accept the call
                        Instance instance = getInstance(
-                                       new 
SimpleInstanceGateway(TestingUtils.directExecutionContext()));
+                                       new 
SimpleActorGateway(TestingUtils.directExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
@@ -81,7 +81,7 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid, 
TestingUtils.directExecutionContext());
 
                        final Instance instance = getInstance(
-                                       new 
SimpleInstanceGateway(TestingUtils.directExecutionContext()));
+                                       new 
SimpleActorGateway(TestingUtils.directExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
@@ -124,7 +124,7 @@ public class ExecutionVertexDeploymentTest {
                                        AkkaUtils.getDefaultTimeout());
 
                        final Instance instance = getInstance(
-                                       new 
SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
+                                       new 
SimpleActorGateway(TestingUtils.defaultExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
@@ -171,7 +171,7 @@ public class ExecutionVertexDeploymentTest {
                                        AkkaUtils.getDefaultTimeout());
 
                        final Instance instance = getInstance(
-                                       new 
SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
+                                       new 
SimpleFailingActorGateway(TestingUtils.directExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
@@ -201,7 +201,7 @@ public class ExecutionVertexDeploymentTest {
                                        AkkaUtils.getDefaultTimeout());
 
                        final Instance instance = getInstance(
-                                       new 
SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
+                                       new 
SimpleFailingActorGateway(TestingUtils.directExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
@@ -244,7 +244,7 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       final Instance instance = getInstance(new 
SimpleInstanceGateway(TestingUtils.directExecutionContext()));
+                       final Instance instance = getInstance(new 
SimpleActorGateway(TestingUtils.directExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
@@ -286,7 +286,7 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionAttemptID eid = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
                        final Instance instance = getInstance(
-                                       new 
ExecutionVertexCancelTest.CancelSequenceInstanceGateway(
+                                       new 
ExecutionVertexCancelTest.CancelSequenceActorGateway(
                                                        context,
                                                        new 
TaskOperationResult(eid, false),
                                                        new 
TaskOperationResult(eid, true)));

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 8ea7017..5e9ee33 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.*;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -47,7 +47,7 @@ public class ExecutionVertexSchedulingTest {
                                        AkkaUtils.getDefaultTimeout());
 
                        // a slot than cannot be deployed to
-                       final Instance instance = 
getInstance(DummyInstanceGateway.INSTANCE);
+                       final Instance instance = 
getInstance(DummyActorGateway.INSTANCE);
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
                        
                        slot.releaseSlot();
@@ -77,7 +77,7 @@ public class ExecutionVertexSchedulingTest {
                                        AkkaUtils.getDefaultTimeout());
 
                        // a slot than cannot be deployed to
-                       final Instance instance = 
getInstance(DummyInstanceGateway.INSTANCE);
+                       final Instance instance = 
getInstance(DummyActorGateway.INSTANCE);
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        slot.releaseSlot();
@@ -113,7 +113,7 @@ public class ExecutionVertexSchedulingTest {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       final Instance instance = getInstance(new 
ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
+                       final Instance instance = getInstance(new 
ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        Scheduler scheduler = mock(Scheduler.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index b4a7e63..2530a53 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -377,7 +377,7 @@ public class LocalInputSplitsTest {
                when(connection.getFQDNHostname()).thenReturn(hostname);
                
                return new Instance(
-                               new 
ExecutionGraphTestUtils.SimpleInstanceGateway(
+                               new ExecutionGraphTestUtils.SimpleActorGateway(
                                                
TestingUtils.defaultExecutionContext()),
                                connection,
                                new InstanceID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index b779d79..f42543f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -78,7 +78,7 @@ public class TerminalStateDeadlockTest {
                        InstanceConnectionInfo ci = new 
InstanceConnectionInfo(address, 12345);
                                
                        HardwareDescription resources = new 
HardwareDescription(4, 4000000, 3000000, 2000000);
-                       Instance instance = new 
Instance(DummyInstanceGateway.INSTANCE, ci, new InstanceID(), resources, 4);
+                       Instance instance = new 
Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, 4);
 
                        this.resource = instance.allocateSimpleSlot(new 
JobID());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 3305254..8604b63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -403,7 +403,7 @@ public class VertexLocationConstraintTest {
                        
                        ExecutionVertex ev = 
eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];
                        
-                       Instance instance = 
ExecutionGraphTestUtils.getInstance(DummyInstanceGateway.INSTANCE);
+                       Instance instance = 
ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE);
                        
ev.setLocationConstraintHosts(Collections.singletonList(instance));
                        
                        assertNotNull(ev.getPreferredLocations());
@@ -435,7 +435,7 @@ public class VertexLocationConstraintTest {
                when(connection.getFQDNHostname()).thenReturn(hostname);
                
                return new Instance(
-                               new 
ExecutionGraphTestUtils.SimpleInstanceGateway(
+                               new ExecutionGraphTestUtils.SimpleActorGateway(
                                                
TestingUtils.defaultExecutionContext()),
                                connection,
                                new InstanceID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
new file mode 100644
index 0000000..2e62781
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract base class for testing {@link ActorGateway} instances. The 
implementing subclass
+ * only has to provide an implementation for handleMessage which contains the 
logic to treat
+ * different messages.
+ */
+abstract public class BaseTestingActorGateway implements ActorGateway {
+       /**
+        * {@link ExecutionContext} which is used to execute the futures.
+        */
+       private final ExecutionContext executionContext;
+
+       public BaseTestingActorGateway(ExecutionContext executionContext) {
+               this.executionContext = executionContext;
+       }
+
+       @Override
+       public Future<Object> ask(Object message, FiniteDuration timeout) {
+               try {
+                       final Object result = handleMessage(message);
+
+                       return Futures.future(new Callable<Object>() {
+                               @Override
+                               public Object call() throws Exception {
+                                       return result;
+                               }
+                       }, executionContext);
+
+               } catch (final Exception e) {
+                       // if an exception occurred in the handleMessage method 
then return it as part of the future
+                       return Futures.future(new Callable<Object>() {
+                               @Override
+                               public Object call() throws Exception {
+                                       throw e;
+                               }
+                       }, executionContext);
+               }
+       }
+
+       /**
+        * Handles the supported messages by this InstanceGateway
+        *
+        * @param message Message to handle
+        * @return Result
+        * @throws Exception
+        */
+       abstract public Object handleMessage(Object message) throws Exception;
+
+       @Override
+       public void tell(Object message) {
+               try {
+                       handleMessage(message);
+               } catch (Exception e) {
+                       // discard exception because it happens on the "remote" 
instance
+               }
+       }
+
+       @Override
+       public void tell(Object message, ActorGateway sender) {
+               try{
+                       handleMessage(message);
+               } catch (Exception e) {
+                       // discard exception because it happens on the "remote" 
instance
+               }
+       }
+
+       @Override
+       public void forward(Object message, ActorGateway sender) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Future<Object> retry(Object message, int numberRetries, 
FiniteDuration timeout, ExecutionContext executionContext) {
+               return ask(message, timeout);
+       }
+
+       @Override
+       public String path() {
+               return "BaseTestingInstanceGateway";
+       }
+
+       @Override
+       public ActorRef actor() {
+               return ActorRef.noSender();
+       }
+
+       @Override
+       public Option<UUID> leaderSessionID() {
+               return Option.empty();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
deleted file mode 100644
index e9f8259..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.dispatch.Futures;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.Callable;
-
-/**
- * Abstract base class for testing {@link InstanceGateway} instances. The 
implementing subclass
- * only has to provide an implementation for handleMessage which contains the 
logic to treat
- * different messages.
- */
-abstract public class BaseTestingInstanceGateway implements InstanceGateway {
-       /**
-        * {@link ExecutionContext} which is used to execute the futures.
-        */
-       private final ExecutionContext executionContext;
-
-       public BaseTestingInstanceGateway(ExecutionContext executionContext) {
-               this.executionContext = executionContext;
-       }
-
-       @Override
-       public Future<Object> ask(Object message, FiniteDuration timeout) {
-               try {
-                       final Object result = handleMessage(message);
-
-                       return Futures.future(new Callable<Object>() {
-                               @Override
-                               public Object call() throws Exception {
-                                       return result;
-                               }
-                       }, executionContext);
-
-               } catch (final Exception e) {
-                       // if an exception occurred in the handleMessage method 
then return it as part of the future
-                       return Futures.future(new Callable<Object>() {
-                               @Override
-                               public Object call() throws Exception {
-                                       throw e;
-                               }
-                       }, executionContext);
-               }
-       }
-
-       /**
-        * Handles the supported messages by this InstanceGateway
-        *
-        * @param message Message to handle
-        * @return Result
-        * @throws Exception
-        */
-       abstract public Object handleMessage(Object message) throws Exception;
-
-       @Override
-       public void tell(Object message) {}
-
-       @Override
-       public void forward(Object message, ActorRef sender) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public Future<Object> retry(Object message, int numberRetries, 
FiniteDuration timeout, ExecutionContext executionContext) {
-               return ask(message, timeout);
-       }
-
-       @Override
-       public String path() {
-               throw new UnsupportedOperationException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
new file mode 100644
index 0000000..10762f2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+/**
+ * Dummy {@link ActorGateway} implementation used for testing.
+ */
+public class DummyActorGateway implements ActorGateway {
+       public static final DummyActorGateway INSTANCE = new 
DummyActorGateway();
+
+       @Override
+       public Future<Object> ask(Object message, FiniteDuration timeout) {
+               return null;
+       }
+
+       @Override
+       public void tell(Object message) {}
+
+       @Override
+       public void tell(Object message, ActorGateway sender) {}
+
+       @Override
+       public void forward(Object message, ActorGateway sender) {}
+
+       @Override
+       public Future<Object> retry(Object message, int numberRetries, 
FiniteDuration timeout, ExecutionContext executionContext) {
+               return null;
+       }
+
+       @Override
+       public String path() {
+               return "DummyInstanceGateway";
+       }
+
+       @Override
+       public ActorRef actor() {
+               return ActorRef.noSender();
+       }
+
+       @Override
+       public Option<UUID> leaderSessionID() {
+               return Option.<UUID>empty();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
deleted file mode 100644
index 5941201..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Dummy {@link InstanceGateway} implementation used for testing.
- */
-public class DummyInstanceGateway implements InstanceGateway {
-       public static final DummyInstanceGateway INSTANCE = new 
DummyInstanceGateway();
-
-       @Override
-       public Future<Object> ask(Object message, FiniteDuration timeout) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public void tell(Object message) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public void forward(Object message, ActorRef sender) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public Future<Object> retry(Object message, int numberRetries, 
FiniteDuration timeout, ExecutionContext executionContext) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public String path() {
-               return "DummyInstanceGateway";
-       }
-}

Reply via email to