[FLINK-4362] [rpc] Auto generate rpc gateways via Java proxies

This PR introduces a generic AkkaRpcActor which receives rpc calls as a
RpcInvocation message. The RpcInvocation message is generated by the
AkkaInvocationHandler which gets them from automatically generated Java Proxies.

Add documentation for proxy based akka rpc service

Log unknown message type in AkkaRpcActor but do not fail actor

Use ReflectionUtil to extract RpcGateway type from RpcEndpoint

This closes #2357.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5cf6b56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5cf6b56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5cf6b56

Branch: refs/heads/flip-6
Commit: f5cf6b5680ff3e7bc44f2084e2754dbf8e8d5ec0
Parents: 94e0092
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Aug 10 18:42:26 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 11:39:12 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/util/ReflectionUtil.java   |  10 +-
 .../flink/runtime/rpc/MainThreadExecutor.java   |   4 +-
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  22 +-
 .../apache/flink/runtime/rpc/RpcService.java    |   2 +-
 .../flink/runtime/rpc/akka/AkkaGateway.java     |   4 +-
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 226 +++++++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 175 ++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 121 +++++-----
 .../flink/runtime/rpc/akka/BaseAkkaActor.java   |  50 ----
 .../flink/runtime/rpc/akka/BaseAkkaGateway.java |  41 ----
 .../rpc/akka/jobmaster/JobMasterAkkaActor.java  |  58 -----
 .../akka/jobmaster/JobMasterAkkaGateway.java    |  57 -----
 .../runtime/rpc/akka/messages/CallAsync.java    |  41 ++++
 .../rpc/akka/messages/CallableMessage.java      |  33 ---
 .../runtime/rpc/akka/messages/CancelTask.java   |  36 ---
 .../runtime/rpc/akka/messages/ExecuteTask.java  |  36 ---
 .../messages/RegisterAtResourceManager.java     |  36 ---
 .../rpc/akka/messages/RegisterJobMaster.java    |  36 ---
 .../runtime/rpc/akka/messages/RequestSlot.java  |  37 ---
 .../rpc/akka/messages/RpcInvocation.java        |  98 ++++++++
 .../runtime/rpc/akka/messages/RunAsync.java     |  40 ++++
 .../rpc/akka/messages/RunnableMessage.java      |  31 ---
 .../akka/messages/UpdateTaskExecutionState.java |  37 ---
 .../ResourceManagerAkkaActor.java               |  65 ------
 .../ResourceManagerAkkaGateway.java             |  67 ------
 .../taskexecutor/TaskExecutorAkkaActor.java     |  77 -------
 .../taskexecutor/TaskExecutorAkkaGateway.java   |  59 -----
 .../flink/runtime/rpc/jobmaster/JobMaster.java  |   4 +-
 .../rpc/resourcemanager/ResourceManager.java    |   4 +-
 .../runtime/rpc/taskexecutor/TaskExecutor.java  |   4 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  |  50 ++--
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |   4 +-
 .../rpc/taskexecutor/TaskExecutorTest.java      |   2 +-
 33 files changed, 700 insertions(+), 867 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
index fe2d4c0..b851eba 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
@@ -48,6 +48,14 @@ public final class ReflectionUtil {
                return getTemplateType(clazz, 0);
        }
 
+       public static <T> Class<T> getTemplateType1(Type type) {
+               if (type instanceof ParameterizedType) {
+                       return (Class<T>) getTemplateTypes((ParameterizedType) 
type)[0];
+               } else {
+                       throw new IllegalArgumentException();
+               }
+       }
+
        public static <T> Class<T> getTemplateType2(Class<?> clazz) {
                return getTemplateType(clazz, 1);
        }
@@ -123,7 +131,7 @@ public final class ReflectionUtil {
                Class<?>[] types = new 
Class<?>[paramterizedType.getActualTypeArguments().length];
                int i = 0;
                for (Type templateArgument : 
paramterizedType.getActualTypeArguments()) {
-                       assert (templateArgument instanceof Class<?>);
+                       assert templateArgument instanceof Class<?>;
                        types[i++] = (Class<?>) templateArgument;
                }
                return types;

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
index 14b2997..882c1b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -47,9 +47,9 @@ public interface MainThreadExecutor {
         * future will throw a {@link TimeoutException}.
         *
         * @param callable Callable to be executed
-        * @param timeout Timeout for the future to complete
+        * @param callTimeout Timeout for the future to complete
         * @param <V> Return value of the callable
         * @return Future of the callable result
         */
-       <V> Future<V> callAsync(Callable<V> callable, Timeout timeout);
+       <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 0d928a8..aef0803 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rpc;
 
 import akka.util.Timeout;
 
+import org.apache.flink.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +61,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        /** RPC service to be used to start the RPC server and to obtain rpc 
gateways */
        private final RpcService rpcService;
 
+       /** Class of the self gateway */
+       private final Class<C> selfGatewayType;
+
        /** Self gateway which can be used to schedule asynchronous calls on 
yourself */
        private final C self;
 
@@ -70,15 +74,19 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * of the executing rpc server. */
        private final MainThreadExecutionContext mainThreadExecutionContext;
 
-
        /**
         * Initializes the RPC endpoint.
         * 
         * @param rpcService The RPC server that dispatches calls to this RPC 
endpoint. 
         */
-       public RpcEndpoint(RpcService rpcService) {
+       protected RpcEndpoint(final RpcService rpcService) {
                this.rpcService = checkNotNull(rpcService, "rpcService");
+
+               // IMPORTANT: Don't change order of selfGatewayType and self 
because rpcService.startServer
+               // requires that selfGatewayType has been initialized
+               this.selfGatewayType = 
ReflectionUtil.getTemplateType1(getClass());
                this.self = rpcService.startServer(this);
+               
                this.selfAddress = rpcService.getAddress(self);
                this.mainThreadExecutionContext = new 
MainThreadExecutionContext((MainThreadExecutor) self);
        }
@@ -149,6 +157,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        //  Asynchronous executions
        // 
------------------------------------------------------------------------
 
+
        /**
         * Execute the runnable in the main thread of the underlying RPC 
endpoint.
         *
@@ -172,6 +181,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
                return ((MainThreadExecutor) self).callAsync(callable, timeout);
        }
 
+       /**
+        * Returns the class of the self gateway type.
+        *
+        * @return Class of the self gateway type
+        */
+       public final Class<C> getSelfGatewayType() {
+               return selfGatewayType;
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 90ff7b6..f93be83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -46,7 +46,7 @@ public interface RpcService {
         * @param <C> Type of the self rpc gateway associated with the rpc 
server
         * @return Self gateway to dispatch remote procedure calls to oneself
         */
-       <S extends RpcEndpoint, C extends RpcGateway> C startServer(S 
rpcEndpoint);
+       <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S 
rpcEndpoint);
 
        /**
         * Stop the underlying rpc server of the provided self gateway.

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
index a96a600..a826e7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
@@ -23,7 +23,7 @@ import akka.actor.ActorRef;
 /**
  * Interface for Akka based rpc gateways
  */
-public interface AkkaGateway {
+interface AkkaGateway {
 
-       ActorRef getActorRef();
+       ActorRef getRpcServer();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
new file mode 100644
index 0000000..e8e383a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
+import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.util.BitSet;
+import java.util.concurrent.Callable;
+
+/**
+ * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation 
handler wraps the
+ * rpc in a {@link RpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
+ * executed.
+ */
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutor {
+       private final ActorRef rpcServer;
+
+       // default timeout for asks
+       private final Timeout timeout;
+
+       AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout) {
+               this.rpcServer = Preconditions.checkNotNull(rpcServer);
+               this.timeout = Preconditions.checkNotNull(timeout);
+       }
+
+       @Override
+       public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+               Class<?> declaringClass = method.getDeclaringClass();
+
+               Object result;
+
+               if (declaringClass.equals(AkkaGateway.class) || 
declaringClass.equals(MainThreadExecutor.class) || 
declaringClass.equals(Object.class)) {
+                       result = method.invoke(this, args);
+               } else {
+                       String methodName = method.getName();
+                       Class<?>[] parameterTypes = method.getParameterTypes();
+                       Annotation[][] parameterAnnotations = 
method.getParameterAnnotations();
+                       Timeout futureTimeout = 
extractRpcTimeout(parameterAnnotations, args, timeout);
+
+                       Tuple2<Class<?>[], Object[]> filteredArguments = 
filterArguments(
+                               parameterTypes,
+                               parameterAnnotations,
+                               args);
+
+                       RpcInvocation rpcInvocation = new RpcInvocation(
+                               methodName,
+                               filteredArguments.f0,
+                               filteredArguments.f1);
+
+                       Class<?> returnType = method.getReturnType();
+
+                       if (returnType.equals(Void.TYPE)) {
+                               rpcServer.tell(rpcInvocation, 
ActorRef.noSender());
+
+                               result = null;
+                       } else if (returnType.equals(Future.class)) {
+                               // execute an asynchronous call
+                               result = Patterns.ask(rpcServer, rpcInvocation, 
futureTimeout);
+                       } else {
+                               // execute a synchronous call
+                               Future<?> futureResult = 
Patterns.ask(rpcServer, rpcInvocation, futureTimeout);
+                               FiniteDuration duration = timeout.duration();
+
+                               result = Await.result(futureResult, duration);
+                       }
+               }
+
+               return result;
+       }
+
+       @Override
+       public ActorRef getRpcServer() {
+               return rpcServer;
+       }
+
+       @Override
+       public void runAsync(Runnable runnable) {
+               // Unfortunately I couldn't find a way to allow only local 
communication. Therefore, the
+               // runnable field is transient transient
+               rpcServer.tell(new RunAsync(runnable), ActorRef.noSender());
+       }
+
+       @Override
+       public <V> Future<V> callAsync(Callable<V> callable, Timeout 
callTimeout) {
+               // Unfortunately I couldn't find a way to allow only local 
communication. Therefore, the
+               // callable field is declared transient
+               @SuppressWarnings("unchecked")
+               Future<V> result = (Future<V>) Patterns.ask(rpcServer, new 
CallAsync(callable), callTimeout);
+
+               return result;
+       }
+
+       /**
+        * Extracts the {@link RpcTimeout} annotated rpc timeout value from the 
list of given method
+        * arguments. If no {@link RpcTimeout} annotated parameter could be 
found, then the default
+        * timeout is returned.
+        *
+        * @param parameterAnnotations Parameter annotations
+        * @param args Array of arguments
+        * @param defaultTimeout Default timeout to return if no {@link 
RpcTimeout} annotated parameter
+        *                       has been found
+        * @return Timeout extracted from the array of arguments or the default 
timeout
+        */
+       private static Timeout extractRpcTimeout(Annotation[][] 
parameterAnnotations, Object[] args, Timeout defaultTimeout) {
+               if (args != null) {
+                       Preconditions.checkArgument(parameterAnnotations.length 
== args.length);
+
+                       for (int i = 0; i < parameterAnnotations.length; i++) {
+                               if (isRpcTimeout(parameterAnnotations[i])) {
+                                       if (args[i] instanceof FiniteDuration) {
+                                               return new 
Timeout((FiniteDuration) args[i]);
+                                       } else {
+                                               throw new RuntimeException("The 
rpc timeout parameter must be of type " +
+                                                       
FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+                                                       " is not supported.");
+                                       }
+                               }
+                       }
+               }
+
+               return defaultTimeout;
+       }
+
+       /**
+        * Removes all {@link RpcTimeout} annotated parameters from the 
parameter type and argument
+        * list.
+        *
+        * @param parameterTypes Array of parameter types
+        * @param parameterAnnotations Array of parameter annotations
+        * @param args Arary of arguments
+        * @return Tuple of filtered parameter types and arguments which no 
longer contain the
+        * {@link RpcTimeout} annotated parameter types and arguments
+        */
+       private static Tuple2<Class<?>[], Object[]> filterArguments(
+               Class<?>[] parameterTypes,
+               Annotation[][] parameterAnnotations,
+               Object[] args) {
+
+               Class<?>[] filteredParameterTypes;
+               Object[] filteredArgs;
+
+               if (args == null) {
+                       filteredParameterTypes = parameterTypes;
+                       filteredArgs = null;
+               } else {
+                       Preconditions.checkArgument(parameterTypes.length == 
parameterAnnotations.length);
+                       Preconditions.checkArgument(parameterAnnotations.length 
== args.length);
+
+                       BitSet isRpcTimeoutParameter = new 
BitSet(parameterTypes.length);
+                       int numberRpcParameters = parameterTypes.length;
+
+                       for (int i = 0; i < parameterTypes.length; i++) {
+                               if (isRpcTimeout(parameterAnnotations[i])) {
+                                       isRpcTimeoutParameter.set(i);
+                                       numberRpcParameters--;
+                               }
+                       }
+
+                       if (numberRpcParameters == parameterTypes.length) {
+                               filteredParameterTypes = parameterTypes;
+                               filteredArgs = args;
+                       } else {
+                               filteredParameterTypes = new 
Class<?>[numberRpcParameters];
+                               filteredArgs = new Object[numberRpcParameters];
+                               int counter = 0;
+
+                               for (int i = 0; i < parameterTypes.length; i++) 
{
+                                       if (!isRpcTimeoutParameter.get(i)) {
+                                               filteredParameterTypes[counter] 
= parameterTypes[i];
+                                               filteredArgs[counter] = args[i];
+                                               counter++;
+                                       }
+                               }
+                       }
+               }
+
+               return Tuple2.of(filteredParameterTypes, filteredArgs);
+       }
+
+       /**
+        * Checks whether any of the annotations is of type {@link RpcTimeout}
+        *
+        * @param annotations Array of annotations
+        * @return True if {@link RpcTimeout} was found; otherwise false
+        */
+       private static boolean isRpcTimeout(Annotation[] annotations) {
+               for (Annotation annotation : annotations) {
+                       if 
(annotation.annotationType().equals(RpcTimeout.class)) {
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
new file mode 100644
index 0000000..57da38a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import akka.pattern.Patterns;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
+import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+
+/**
+ * Akka rpc actor which receives {@link RpcInvocation}, {@link RunAsync} and 
{@link CallAsync}
+ * messages.
+ * <p>
+ * The {@link RpcInvocation} designates a rpc and is dispatched to the given 
{@link RpcEndpoint}
+ * instance.
+ * <p>
+ * The {@link RunAsync} and {@link CallAsync} messages contain executable code 
which is executed
+ * in the context of the actor thread.
+ *
+ * @param <C> Type of the {@link RpcGateway} associated with the {@link 
RpcEndpoint}
+ * @param <T> Type of the {@link RpcEndpoint}
+ */
+class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends 
UntypedActor {
+       private static final Logger LOG = 
LoggerFactory.getLogger(AkkaRpcActor.class);
+
+       private final T rpcEndpoint;
+
+       AkkaRpcActor(final T rpcEndpoint) {
+               this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint, "rpc 
endpoint");
+       }
+
+       @Override
+       public void onReceive(final Object message)  {
+               if (message instanceof RunAsync) {
+                       handleRunAsync((RunAsync) message);
+               } else if (message instanceof CallAsync) {
+                       handleCallAsync((CallAsync) message);
+               } else if (message instanceof RpcInvocation) {
+                       handleRpcInvocation((RpcInvocation) message);
+               } else {
+                       LOG.warn("Received message of unknown type {}. Dropping 
this message!", message.getClass());
+               }
+       }
+
+       /**
+        * Handle rpc invocations by looking up the rpc method on the rpc 
endpoint and calling this
+        * method with the provided method arguments. If the method has a 
return value, it is returned
+        * to the sender of the call.
+        *
+        * @param rpcInvocation Rpc invocation message
+        */
+       private void handleRpcInvocation(RpcInvocation rpcInvocation) {
+               Method rpcMethod = null;
+
+               try {
+                       rpcMethod = 
lookupRpcMethod(rpcInvocation.getMethodName(), 
rpcInvocation.getParameterTypes());
+               } catch (final NoSuchMethodException e) {
+                       LOG.error("Could not find rpc method for rpc 
invocation: {}.", rpcInvocation, e);
+               }
+
+               if (rpcMethod != null) {
+                       if (rpcMethod.getReturnType().equals(Void.TYPE)) {
+                               // No return value to send back
+                               try {
+                                       rpcMethod.invoke(rpcEndpoint, 
rpcInvocation.getArgs());
+                               } catch (Throwable e) {
+                                       LOG.error("Error while executing remote 
procedure call {}.", rpcMethod, e);
+                               }
+                       } else {
+                               try {
+                                       Object result = 
rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+
+                                       if (result instanceof Future) {
+                                               // pipe result to sender
+                                               Patterns.pipe((Future<?>) 
result, getContext().dispatcher()).to(getSender());
+                                       } else {
+                                               // tell the sender the result 
of the computation
+                                               getSender().tell(new 
Status.Success(result), getSelf());
+                                       }
+                               } catch (Throwable e) {
+                                       // tell the sender about the failure
+                                       getSender().tell(new Status.Failure(e), 
getSelf());
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Handle asynchronous {@link Callable}. This method simply executes 
the given {@link Callable}
+        * in the context of the actor thread.
+        *
+        * @param callAsync Call async message
+        */
+       private void handleCallAsync(CallAsync callAsync) {
+               if (callAsync.getCallable() == null) {
+                       final String result = "Received a " + 
callAsync.getClass().getName() + " message with an empty " +
+                               "callable field. This indicates that this 
message has been serialized " +
+                               "prior to sending the message. The " + 
callAsync.getClass().getName() +
+                               " is only supported with local communication.";
+
+                       LOG.warn(result);
+
+                       getSender().tell(new Status.Failure(new 
Exception(result)), getSelf());
+               } else {
+                       try {
+                               Object result = callAsync.getCallable().call();
+
+                               getSender().tell(new Status.Success(result), 
getSelf());
+                       } catch (Throwable e) {
+                               getSender().tell(new Status.Failure(e), 
getSelf());
+                       }
+               }
+       }
+
+       /**
+        * Handle asynchronous {@link Runnable}. This method simply executes 
the given {@link Runnable}
+        * in the context of the actor thread.
+        *
+        * @param runAsync Run async message
+        */
+       private void handleRunAsync(RunAsync runAsync) {
+               if (runAsync.getRunnable() == null) {
+                       LOG.warn("Received a {} message with an empty runnable 
field. This indicates " +
+                               "that this message has been serialized prior to 
sending the message. The " +
+                               "{} is only supported with local 
communication.",
+                               runAsync.getClass().getName(),
+                               runAsync.getClass().getName());
+               } else {
+                       try {
+                               runAsync.getRunnable().run();
+                       } catch (final Throwable e) {
+                               LOG.error("Caught exception while executing 
runnable in main thread.", e);
+                       }
+               }
+       }
+
+       /**
+        * Look up the rpc method on the given {@link RpcEndpoint} instance.
+        *
+        * @param methodName Name of the method
+        * @param parameterTypes Parameter types of the method
+        * @return Method of the rpc endpoint
+        * @throws NoSuchMethodException
+        */
+       private Method lookupRpcMethod(final String methodName, final 
Class<?>[] parameterTypes) throws NoSuchMethodException {
+               return rpcEndpoint.getClass().getMethod(methodName, 
parameterTypes);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index d55bd13..17983d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -29,88 +29,82 @@ import akka.dispatch.Mapper;
 import akka.pattern.AskableActorSelection;
 import akka.util.Timeout;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaActor;
-import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaGateway;
-import 
org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaActor;
-import 
org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaGateway;
-import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaActor;
-import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaGateway;
-import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
-import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
 import java.util.HashSet;
-import java.util.Set;
 
+/**
+ * Akka based {@link RpcService} implementation. The rpc service starts an 
Akka actor to receive
+ * rpcs from a {@link RpcGateway}.
+ */
 public class AkkaRpcService implements RpcService {
+       private static final Logger LOG = 
LoggerFactory.getLogger(AkkaRpcService.class);
+
        private final ActorSystem actorSystem;
        private final Timeout timeout;
-       private final Set<ActorRef> actors = new HashSet<>();
+       private final Collection<ActorRef> actors = new HashSet<>(4);
 
-       public AkkaRpcService(ActorSystem actorSystem, Timeout timeout) {
-               this.actorSystem = actorSystem;
-               this.timeout = timeout;
+       public AkkaRpcService(final ActorSystem actorSystem, final Timeout 
timeout) {
+               this.actorSystem = Preconditions.checkNotNull(actorSystem, 
"actor system");
+               this.timeout = Preconditions.checkNotNull(timeout, "timeout");
        }
 
        @Override
-       public <C extends RpcGateway> Future<C> connect(String address, final 
Class<C> clazz) {
-               ActorSelection actorSel = actorSystem.actorSelection(address);
+       public <C extends RpcGateway> Future<C> connect(final String address, 
final Class<C> clazz) {
+               LOG.info("Try to connect to remote rpc server with address {}. 
Returning a {} gateway.", address, clazz.getName());
 
-               AskableActorSelection asker = new 
AskableActorSelection(actorSel);
+               final ActorSelection actorSel = 
actorSystem.actorSelection(address);
 
-               Future<Object> identify = asker.ask(new Identify(42), timeout);
+               final AskableActorSelection asker = new 
AskableActorSelection(actorSel);
+
+               final Future<Object> identify = asker.ask(new Identify(42), 
timeout);
 
                return identify.map(new Mapper<Object, C>(){
+                       @Override
                        public C apply(Object obj) {
                                ActorRef actorRef = ((ActorIdentity) 
obj).getRef();
 
-                               if (clazz == TaskExecutorGateway.class) {
-                                       return (C) new 
TaskExecutorAkkaGateway(actorRef, timeout);
-                               } else if (clazz == 
ResourceManagerGateway.class) {
-                                       return (C) new 
ResourceManagerAkkaGateway(actorRef, timeout);
-                               } else if (clazz == JobMasterGateway.class) {
-                                       return (C) new 
JobMasterAkkaGateway(actorRef, timeout);
-                               } else {
-                                       throw new RuntimeException("Could not 
find remote endpoint " + clazz);
-                               }
+                               InvocationHandler akkaInvocationHandler = new 
AkkaInvocationHandler(actorRef, timeout);
+
+                               @SuppressWarnings("unchecked")
+                               C proxy = (C) Proxy.newProxyInstance(
+                                       ClassLoader.getSystemClassLoader(),
+                                       new Class<?>[] {clazz},
+                                       akkaInvocationHandler);
+
+                               return proxy;
                        }
                }, actorSystem.dispatcher());
        }
 
        @Override
-       public <S extends RpcEndpoint, C extends RpcGateway> C startServer(S 
rpcEndpoint) {
-               ActorRef ref;
-               C self;
-               if (rpcEndpoint instanceof TaskExecutor) {
-                       ref = actorSystem.actorOf(
-                               Props.create(TaskExecutorAkkaActor.class, 
rpcEndpoint)
-                       );
-
-                       self = (C) new TaskExecutorAkkaGateway(ref, timeout);
-               } else if (rpcEndpoint instanceof ResourceManager) {
-                       ref = actorSystem.actorOf(
-                               Props.create(ResourceManagerAkkaActor.class, 
rpcEndpoint)
-                       );
-
-                       self = (C) new ResourceManagerAkkaGateway(ref, timeout);
-               } else if (rpcEndpoint instanceof JobMaster) {
-                       ref = actorSystem.actorOf(
-                               Props.create(JobMasterAkkaActor.class, 
rpcEndpoint)
-                       );
-
-                       self = (C) new JobMasterAkkaGateway(ref, timeout);
-               } else {
-                       throw new RuntimeException("Could not start RPC server 
for class " + rpcEndpoint.getClass());
-               }
+       public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S 
rpcEndpoint) {
+               Preconditions.checkNotNull(rpcEndpoint, "rpc endpoint");
 
-               actors.add(ref);
+               LOG.info("Start Akka rpc actor to handle rpcs for {}.", 
rpcEndpoint.getClass().getName());
+
+               Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, 
rpcEndpoint);
+
+               ActorRef actorRef = actorSystem.actorOf(akkaRpcActorProps);
+               actors.add(actorRef);
+
+               InvocationHandler akkaInvocationHandler = new 
AkkaInvocationHandler(actorRef, timeout);
+
+               @SuppressWarnings("unchecked")
+               C self = (C) Proxy.newProxyInstance(
+                       ClassLoader.getSystemClassLoader(),
+                       new Class<?>[]{rpcEndpoint.getSelfGatewayType(), 
MainThreadExecutor.class, AkkaGateway.class},
+                       akkaInvocationHandler);
 
                return self;
        }
@@ -120,16 +114,19 @@ public class AkkaRpcService implements RpcService {
                if (selfGateway instanceof AkkaGateway) {
                        AkkaGateway akkaClient = (AkkaGateway) selfGateway;
 
-                       if (actors.contains(akkaClient.getActorRef())) {
-                               
akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
-                       } else {
-                               // don't stop this actor since it was not 
started by this RPC service
+                       if (actors.contains(akkaClient.getRpcServer())) {
+                               ActorRef selfActorRef = 
akkaClient.getRpcServer();
+
+                               LOG.info("Stop Akka rpc actor {}.", 
selfActorRef.path());
+
+                               selfActorRef.tell(PoisonPill.getInstance(), 
ActorRef.noSender());
                        }
                }
        }
 
        @Override
        public void stopService() {
+               LOG.info("Stop Akka rpc service.");
                actorSystem.shutdown();
                actorSystem.awaitTermination();
        }
@@ -137,9 +134,11 @@ public class AkkaRpcService implements RpcService {
        @Override
        public <C extends RpcGateway> String getAddress(C selfGateway) {
                if (selfGateway instanceof AkkaGateway) {
-                       return AkkaUtils.getAkkaURL(actorSystem, ((AkkaGateway) 
selfGateway).getActorRef());
+                       ActorRef actorRef = ((AkkaGateway) 
selfGateway).getRpcServer();
+                       return AkkaUtils.getAkkaURL(actorSystem, actorRef);
                } else {
-                       throw new RuntimeException("Cannot get address for non 
" + AkkaGateway.class.getName() + ".");
+                       String className = AkkaGateway.class.getName();
+                       throw new RuntimeException("Cannot get address for non 
" + className + '.');
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
deleted file mode 100644
index 3cb499c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka;
-
-import akka.actor.Status;
-import akka.actor.UntypedActor;
-import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
-import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BaseAkkaActor extends UntypedActor {
-       private static final Logger LOG = 
LoggerFactory.getLogger(BaseAkkaActor.class);
-
-       @Override
-       public void onReceive(Object message) throws Exception {
-               if (message instanceof RunnableMessage) {
-                       try {
-                               ((RunnableMessage) message).getRunnable().run();
-                       } catch (Exception e) {
-                               LOG.error("Encountered error while executing 
runnable.", e);
-                       }
-               } else if (message instanceof CallableMessage<?>) {
-                       try {
-                               Object result = ((CallableMessage<?>) 
message).getCallable().call();
-                               sender().tell(new Status.Success(result), 
getSelf());
-                       } catch (Exception e) {
-                               sender().tell(new Status.Failure(e), getSelf());
-                       }
-               } else {
-                       throw new RuntimeException("Unknown message " + 
message);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
deleted file mode 100644
index 512790d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka;
-
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
-import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
-import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
-import scala.concurrent.Future;
-
-import java.util.concurrent.Callable;
-
-public abstract class BaseAkkaGateway implements MainThreadExecutor, 
AkkaGateway {
-       @Override
-       public void runAsync(Runnable runnable) {
-               getActorRef().tell(new RunnableMessage(runnable), 
ActorRef.noSender());
-       }
-
-       @Override
-       public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
-               return (Future<V>) Patterns.ask(getActorRef(), new 
CallableMessage(callable), timeout);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
deleted file mode 100644
index 9e04ea9..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.jobmaster;
-
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
-import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
-
-public class JobMasterAkkaActor extends BaseAkkaActor {
-       private final JobMaster jobMaster;
-
-       public JobMasterAkkaActor(JobMaster jobMaster) {
-               this.jobMaster = jobMaster;
-       }
-
-       @Override
-       public void onReceive(Object message) throws Exception {
-               if (message instanceof UpdateTaskExecutionState) {
-
-                       final ActorRef sender = getSender();
-
-                       UpdateTaskExecutionState updateTaskExecutionState = 
(UpdateTaskExecutionState) message;
-
-                       try {
-                               Acknowledge result = 
jobMaster.updateTaskExecutionState(updateTaskExecutionState.getTaskExecutionState());
-                               sender.tell(new Status.Success(result), 
getSelf());
-                       } catch (Exception e) {
-                               sender.tell(new Status.Failure(e), getSelf());
-                       }
-               } else if (message instanceof RegisterAtResourceManager) {
-                       RegisterAtResourceManager registerAtResourceManager = 
(RegisterAtResourceManager) message;
-
-                       
jobMaster.registerAtResourceManager(registerAtResourceManager.getAddress());
-               } else {
-                       super.onReceive(message);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
deleted file mode 100644
index e6bf061..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.jobmaster;
-
-import akka.actor.ActorRef;
-import akka.pattern.AskableActorRef;
-import akka.util.Timeout;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
-import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
-import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import scala.concurrent.Future;
-import scala.reflect.ClassTag$;
-
-public class JobMasterAkkaGateway extends BaseAkkaGateway implements 
JobMasterGateway {
-       private final AskableActorRef actorRef;
-       private final Timeout timeout;
-
-       public JobMasterAkkaGateway(ActorRef actorRef, Timeout timeout) {
-               this.actorRef = new AskableActorRef(actorRef);
-               this.timeout = timeout;
-       }
-
-       @Override
-       public Future<Acknowledge> updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
-               return actorRef.ask(new 
UpdateTaskExecutionState(taskExecutionState), timeout)
-                       
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
-       }
-
-       @Override
-       public void registerAtResourceManager(String address) {
-               actorRef.actorRef().tell(new 
RegisterAtResourceManager(address), actorRef.actorRef());
-       }
-
-       @Override
-       public ActorRef getActorRef() {
-               return actorRef.actorRef();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java
new file mode 100644
index 0000000..79b7825
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+/**
+ * Message for asynchronous callable invocations
+ */
+public final class CallAsync implements Serializable {
+       private static final long serialVersionUID = 2834204738928484060L;
+
+       private transient Callable<?> callable;
+
+       public CallAsync(Callable<?> callable) {
+               this.callable = Preconditions.checkNotNull(callable);
+       }
+
+       public Callable<?> getCallable() {
+               return callable;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
deleted file mode 100644
index f0e555f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import java.util.concurrent.Callable;
-
-public class CallableMessage<V> {
-       private final Callable<V> callable;
-
-       public CallableMessage(Callable<V> callable) {
-               this.callable = callable;
-       }
-
-       public Callable<V> getCallable() {
-               return callable;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
deleted file mode 100644
index 0b9e9dc..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-
-import java.io.Serializable;
-
-public class CancelTask implements Serializable {
-       private static final long serialVersionUID = -2998176874447950595L;
-       private final ExecutionAttemptID executionAttemptID;
-
-       public CancelTask(ExecutionAttemptID executionAttemptID) {
-               this.executionAttemptID = executionAttemptID;
-       }
-
-       public ExecutionAttemptID getExecutionAttemptID() {
-               return executionAttemptID;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
deleted file mode 100644
index a83d539..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-
-import java.io.Serializable;
-
-public class ExecuteTask implements Serializable {
-       private static final long serialVersionUID = -6769958430967048348L;
-       private final TaskDeploymentDescriptor taskDeploymentDescriptor;
-
-       public ExecuteTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
-               this.taskDeploymentDescriptor = taskDeploymentDescriptor;
-       }
-
-       public TaskDeploymentDescriptor getTaskDeploymentDescriptor() {
-               return taskDeploymentDescriptor;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
deleted file mode 100644
index 3ade082..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import java.io.Serializable;
-
-public class RegisterAtResourceManager implements Serializable {
-
-       private static final long serialVersionUID = -4175905742620903602L;
-
-       private final String address;
-
-       public RegisterAtResourceManager(String address) {
-               this.address = address;
-       }
-
-       public String getAddress() {
-               return address;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
deleted file mode 100644
index b35ea38..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
-
-import java.io.Serializable;
-
-public class RegisterJobMaster implements Serializable{
-       private static final long serialVersionUID = -4616879574192641507L;
-       private final JobMasterRegistration jobMasterRegistration;
-
-       public RegisterJobMaster(JobMasterRegistration jobMasterRegistration) {
-               this.jobMasterRegistration = jobMasterRegistration;
-       }
-
-       public JobMasterRegistration getJobMasterRegistration() {
-               return jobMasterRegistration;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
deleted file mode 100644
index 85ceeec..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
-
-import java.io.Serializable;
-
-public class RequestSlot implements Serializable {
-       private static final long serialVersionUID = 7207463889348525866L;
-
-       private final SlotRequest slotRequest;
-
-       public RequestSlot(SlotRequest slotRequest) {
-               this.slotRequest = slotRequest;
-       }
-
-       public SlotRequest getSlotRequest() {
-               return slotRequest;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java
new file mode 100644
index 0000000..5d52ef1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Rpc invocation message containing the remote procedure name, its parameter 
types and the
+ * corresponding call arguments.
+ */
+public final class RpcInvocation implements Serializable {
+       private static final long serialVersionUID = -7058254033460536037L;
+
+       private final String methodName;
+       private final Class<?>[] parameterTypes;
+       private transient Object[] args;
+
+       public RpcInvocation(String methodName, Class<?>[] parameterTypes, 
Object[] args) {
+               this.methodName = Preconditions.checkNotNull(methodName);
+               this.parameterTypes = 
Preconditions.checkNotNull(parameterTypes);
+               this.args = args;
+       }
+
+       public String getMethodName() {
+               return methodName;
+       }
+
+       public Class<?>[] getParameterTypes() {
+               return parameterTypes;
+       }
+
+       public Object[] getArgs() {
+               return args;
+       }
+
+       private void writeObject(ObjectOutputStream oos) throws IOException {
+               oos.defaultWriteObject();
+
+               if (args != null) {
+                       // write has args true
+                       oos.writeBoolean(true);
+
+                       for (int i = 0; i < args.length; i++) {
+                               try {
+                                       oos.writeObject(args[i]);
+                               } catch (IOException e) {
+                                       Class<?> argClass = args[i].getClass();
+
+                                       throw new IOException("Could not write 
" + i + "th argument of method " +
+                                               methodName + ". The argument 
type is " + argClass + ". " +
+                                               "Make sure that this type is 
serializable.", e);
+                               }
+                       }
+               } else {
+                       // write has args false
+                       oos.writeBoolean(false);
+               }
+       }
+
+       private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
+               ois.defaultReadObject();
+
+               boolean hasArgs = ois.readBoolean();
+
+               if (hasArgs) {
+                       int numberArguments = parameterTypes.length;
+
+                       args = new Object[numberArguments];
+
+                       for (int i = 0; i < numberArguments; i++) {
+                               args[i] = ois.readObject();
+                       }
+               } else {
+                       args = null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
new file mode 100644
index 0000000..fb95852
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Message for asynchronous runnable invocations
+ */
+public final class RunAsync implements Serializable {
+       private static final long serialVersionUID = -3080595100695371036L;
+
+       private final transient Runnable runnable;
+
+       public RunAsync(Runnable runnable) {
+               this.runnable = Preconditions.checkNotNull(runnable);
+       }
+
+       public Runnable getRunnable() {
+               return runnable;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
deleted file mode 100644
index 3556738..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-public class RunnableMessage {
-       private final Runnable runnable;
-
-       public RunnableMessage(Runnable runnable) {
-               this.runnable = runnable;
-       }
-
-       public Runnable getRunnable() {
-               return runnable;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
deleted file mode 100644
index f89cd2f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-
-import java.io.Serializable;
-
-public class UpdateTaskExecutionState implements Serializable{
-       private static final long serialVersionUID = -6662229114427331436L;
-
-       private final TaskExecutionState taskExecutionState;
-
-       public UpdateTaskExecutionState(TaskExecutionState taskExecutionState) {
-               this.taskExecutionState = taskExecutionState;
-       }
-
-       public TaskExecutionState getTaskExecutionState() {
-               return taskExecutionState;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
deleted file mode 100644
index 13101f9..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.resourcemanager;
-
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import akka.pattern.Patterns;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
-import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
-import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
-import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
-import scala.concurrent.Future;
-
-public class ResourceManagerAkkaActor extends BaseAkkaActor {
-       private final ResourceManager resourceManager;
-
-       public ResourceManagerAkkaActor(ResourceManager resourceManager) {
-               this.resourceManager = resourceManager;
-       }
-
-       @Override
-       public void onReceive(Object message) throws Exception {
-               final ActorRef sender = getSender();
-
-               if (message instanceof RegisterJobMaster) {
-                       RegisterJobMaster registerJobMaster = 
(RegisterJobMaster) message;
-
-                       try {
-                               Future<RegistrationResponse> response = 
resourceManager.registerJobMaster(registerJobMaster.getJobMasterRegistration());
-                               Patterns.pipe(response, 
getContext().dispatcher()).to(sender());
-                       } catch (Exception e) {
-                               sender.tell(new Status.Failure(e), getSelf());
-                       }
-               } else if (message instanceof RequestSlot) {
-                       RequestSlot requestSlot = (RequestSlot) message;
-
-                       try {
-                               SlotAssignment response = 
resourceManager.requestSlot(requestSlot.getSlotRequest());
-                               sender.tell(new Status.Success(response), 
getSelf());
-                       } catch (Exception e) {
-                               sender.tell(new Status.Failure(e), getSelf());
-                       }
-               } else {
-                       super.onReceive(message);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
deleted file mode 100644
index 1304707..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.resourcemanager;
-
-import akka.actor.ActorRef;
-import akka.pattern.AskableActorRef;
-import akka.util.Timeout;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
-import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
-import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
-import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-public class ResourceManagerAkkaGateway extends BaseAkkaGateway implements 
ResourceManagerGateway {
-       private final AskableActorRef actorRef;
-       private final Timeout timeout;
-
-       public ResourceManagerAkkaGateway(ActorRef actorRef, Timeout timeout) {
-               this.actorRef = new AskableActorRef(actorRef);
-               this.timeout = timeout;
-       }
-
-       @Override
-       public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration, FiniteDuration 
timeout) {
-               return actorRef.ask(new 
RegisterJobMaster(jobMasterRegistration), new Timeout(timeout))
-                       
.mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
-       }
-
-       @Override
-       public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-               return actorRef.ask(new 
RegisterJobMaster(jobMasterRegistration), timeout)
-                       
.mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
-       }
-
-       @Override
-       public Future<SlotAssignment> requestSlot(SlotRequest slotRequest) {
-               return actorRef.ask(new RequestSlot(slotRequest), timeout)
-                       
.mapTo(ClassTag$.MODULE$.<SlotAssignment>apply(SlotAssignment.class));
-       }
-
-       @Override
-       public ActorRef getActorRef() {
-               return actorRef.actorRef();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
deleted file mode 100644
index ed522cc..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.taskexecutor;
-
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import akka.dispatch.OnComplete;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
-import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
-import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
-import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
-
-public class TaskExecutorAkkaActor extends BaseAkkaActor {
-       private final TaskExecutorGateway taskExecutor;
-
-       public TaskExecutorAkkaActor(TaskExecutorGateway taskExecutor) {
-               this.taskExecutor = taskExecutor;
-       }
-
-       @Override
-       public void onReceive(Object message) throws Exception {
-               final ActorRef sender = getSender();
-
-               if (message instanceof ExecuteTask) {
-                       ExecuteTask executeTask = (ExecuteTask) message;
-
-                       
taskExecutor.executeTask(executeTask.getTaskDeploymentDescriptor()).onComplete(
-                               new OnComplete<Acknowledge>() {
-                                       @Override
-                                       public void onComplete(Throwable 
failure, Acknowledge success) throws Throwable {
-                                               if (failure != null) {
-                                                       sender.tell(new 
Status.Failure(failure), getSelf());
-                                               } else {
-                                                       sender.tell(new 
Status.Success(Acknowledge.get()), getSelf());
-                                               }
-                                       }
-                               },
-                               getContext().dispatcher()
-                       );
-               } else if (message instanceof CancelTask) {
-                       CancelTask cancelTask = (CancelTask) message;
-
-                       
taskExecutor.cancelTask(cancelTask.getExecutionAttemptID()).onComplete(
-                               new OnComplete<Acknowledge>() {
-                                       @Override
-                                       public void onComplete(Throwable 
failure, Acknowledge success) throws Throwable {
-                                               if (failure != null) {
-                                                       sender.tell(new 
Status.Failure(failure), getSelf());
-                                               } else {
-                                                       sender.tell(new 
Status.Success(Acknowledge.get()), getSelf());
-                                               }
-                                       }
-                               },
-                               getContext().dispatcher()
-                       );
-               } else {
-                       super.onReceive(message);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
deleted file mode 100644
index 7f0a522..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.taskexecutor;
-
-import akka.actor.ActorRef;
-import akka.pattern.AskableActorRef;
-import akka.util.Timeout;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
-import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
-import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
-import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
-import scala.concurrent.Future;
-import scala.reflect.ClassTag$;
-
-public class TaskExecutorAkkaGateway extends BaseAkkaGateway implements 
TaskExecutorGateway {
-       private final AskableActorRef actorRef;
-       private final Timeout timeout;
-
-       public TaskExecutorAkkaGateway(ActorRef actorRef, Timeout timeout) {
-               this.actorRef = new AskableActorRef(actorRef);
-               this.timeout = timeout;
-       }
-
-       @Override
-       public Future<Acknowledge> executeTask(TaskDeploymentDescriptor 
taskDeploymentDescriptor) {
-               return actorRef.ask(new ExecuteTask(taskDeploymentDescriptor), 
timeout)
-                       
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
-       }
-
-       @Override
-       public Future<Acknowledge> cancelTask(ExecutionAttemptID 
executionAttemptId) {
-               return actorRef.ask(new CancelTask(executionAttemptId), timeout)
-                       
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
-       }
-
-       @Override
-       public ActorRef getActorRef() {
-               return actorRef.actorRef();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
index b81b19c..e53cd68 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.Preconditions;
 import scala.Tuple2;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
@@ -76,7 +77,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
        public JobMaster(RpcService rpcService, ExecutorService 
executorService) {
                super(rpcService);
-               executionContext = 
ExecutionContext$.MODULE$.fromExecutor(executorService);
+               executionContext = ExecutionContext$.MODULE$.fromExecutor(
+                       Preconditions.checkNotNull(executorService));
                scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index c7e8def..729ef0c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.util.Preconditions;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.Future;
@@ -49,7 +50,8 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
 
        public ResourceManager(RpcService rpcService, ExecutorService 
executorService) {
                super(rpcService);
-               this.executionContext = 
ExecutionContext$.MODULE$.fromExecutor(executorService);
+               this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
+                       Preconditions.checkNotNull(executorService));
                this.jobMasterGateways = new HashMap<>();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
index cdfc3bd..3a7dd9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
 import scala.concurrent.ExecutionContext;
 
 import java.util.HashSet;
@@ -47,7 +48,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
        public TaskExecutor(RpcService rpcService, ExecutorService 
executorService) {
                super(rpcService);
-               this.executionContext = 
ExecutionContexts$.MODULE$.fromExecutor(executorService);
+               this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(
+                       Preconditions.checkNotNull(executorService));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index 0ded25e..e50533e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.util.ReflectionUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.reflections.Reflections;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -51,9 +51,8 @@ public class RpcCompletenessTest extends TestLogger {
 
                for (Class<? extends RpcEndpoint> rpcEndpoint :classes){
                        c = rpcEndpoint;
-                       Type superClass = c.getGenericSuperclass();
 
-                       Class<?> rpcGatewayType = 
extractTypeParameter(superClass, 0);
+                       Class<?> rpcGatewayType = 
ReflectionUtil.getTemplateType1(c);
 
                        if (rpcGatewayType != null) {
                                checkCompleteness(rpcEndpoint, (Class<? extends 
RpcGateway>) rpcGatewayType);
@@ -137,13 +136,16 @@ public class RpcCompletenessTest extends TestLogger {
                }
 
                Annotation[][] parameterAnnotations = 
gatewayMethod.getParameterAnnotations();
+               Class<?>[] parameterTypes = gatewayMethod.getParameterTypes();
                int rpcTimeoutParameters = 0;
 
-               for (Annotation[] parameterAnnotation : parameterAnnotations) {
-                       for (Annotation annotation : parameterAnnotation) {
-                               if (annotation.equals(RpcTimeout.class)) {
-                                       rpcTimeoutParameters++;
-                               }
+               for (int i = 0; i < parameterAnnotations.length; i++) {
+                       if (isRpcTimeout(parameterAnnotations[i])) {
+                               assertTrue(
+                                       "The rpc timeout has to be of type " + 
FiniteDuration.class.getName() + ".",
+                                       
parameterTypes[i].equals(FiniteDuration.class));
+
+                               rpcTimeoutParameters++;
                        }
                }
 
@@ -211,10 +213,10 @@ public class RpcCompletenessTest extends TestLogger {
                                if 
(!futureClass.equals(RpcCompletenessTest.futureClass)) {
                                        return false;
                                } else {
-                                       Class<?> valueClass = 
extractTypeParameter(futureClass, 0);
+                                       Class<?> valueClass = 
ReflectionUtil.getTemplateType1(gatewayMethod.getGenericReturnType());
 
                                        if 
(endpointMethod.getReturnType().equals(futureClass)) {
-                                               Class<?> rpcEndpointValueClass 
= extractTypeParameter(endpointMethod.getReturnType(), 0);
+                                               Class<?> rpcEndpointValueClass 
= ReflectionUtil.getTemplateType1(endpointMethod.getGenericReturnType());
 
                                                // check if we have the same 
future value types
                                                if (valueClass != null && 
rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) 
{
@@ -251,7 +253,7 @@ public class RpcCompletenessTest extends TestLogger {
                if (method.getReturnType().equals(Void.TYPE)) {
                        builder.append("void").append(" ");
                } else if (method.getReturnType().equals(futureClass)) {
-                       Class<?> valueClass = 
extractTypeParameter(method.getGenericReturnType(), 0);
+                       Class<?> valueClass = 
ReflectionUtil.getTemplateType1(method.getGenericReturnType());
 
                        builder
                                .append(futureClass.getSimpleName())
@@ -291,30 +293,6 @@ public class RpcCompletenessTest extends TestLogger {
                return builder.toString();
        }
 
-       private Class<?> extractTypeParameter(Type genericType, int position) {
-               if (genericType instanceof ParameterizedType) {
-                       ParameterizedType parameterizedType = 
(ParameterizedType) genericType;
-
-                       Type[] typeArguments = 
parameterizedType.getActualTypeArguments();
-
-                       if (position < 0 || position >= typeArguments.length) {
-                               throw new IndexOutOfBoundsException("The 
generic type " +
-                                       parameterizedType.getRawType() + " only 
has " + typeArguments.length +
-                                       " type arguments.");
-                       } else {
-                               Type typeArgument = typeArguments[position];
-
-                               if (typeArgument instanceof Class<?>) {
-                                       return (Class<?>) typeArgument;
-                               } else {
-                                       return null;
-                               }
-                       }
-               } else {
-                       return null;
-               }
-       }
-
        private boolean isRpcTimeout(Annotation[] annotations) {
                for (Annotation annotation : annotations) {
                        if 
(annotation.annotationType().equals(RpcTimeout.class)) {

Reply via email to