[FLINK-4703] RpcCompletenessTest: Add support for type arguments and subclasses
This closes #2561 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e58ebf2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e58ebf2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e58ebf2 Branch: refs/heads/flip-6 Commit: 6e58ebf22cb11631438ea51118615053e11cbcdb Parents: 415af17 Author: Maximilian Michels <m...@apache.org> Authored: Wed Sep 28 12:39:30 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:42 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/rpc/RpcEndpoint.java | 23 +++++- .../flink/runtime/rpc/RpcCompletenessTest.java | 80 ++++++++++++++++++-- 2 files changed, 94 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6e58ebf2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 4e5e49a..79961f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -85,9 +85,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> { // IMPORTANT: Don't change order of selfGatewayType and self because rpcService.startServer // requires that selfGatewayType has been initialized - this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass()); + this.selfGatewayType = determineSelfGatewayType(); this.self = rpcService.startServer(this); - + this.mainThreadExecutor = new MainThreadExecutor((MainThreadExecutable) self); } @@ -255,4 +255,23 @@ public abstract class RpcEndpoint<C extends RpcGateway> { gateway.runAsync(runnable); } } + + /** + * Determines the self gateway type specified in one of the subclasses which extend this class. + * May traverse multiple class hierarchies until a Gateway type is found as a first type argument. + * @return Class<C> The determined self gateway type + */ + private Class<C> determineSelfGatewayType() { + + // determine self gateway type + Class c = getClass(); + Class<C> determinedSelfGatewayType; + do { + determinedSelfGatewayType = ReflectionUtil.getTemplateType1(c); + // check if super class contains self gateway type in next loop + c = c.getSuperclass(); + } while (!RpcGateway.class.isAssignableFrom(determinedSelfGatewayType)); + + return determinedSelfGatewayType; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6e58ebf2/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index 53355e8..e7143ae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -26,9 +26,14 @@ import org.apache.flink.util.ReflectionUtil; import org.apache.flink.util.TestLogger; import org.junit.Test; import org.reflections.Reflections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.lang.annotation.Annotation; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -41,8 +46,33 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Test which ensures that all classes of subtype {@link RpcEndpoint} implement + * the methods specified in the generic gateway type argument. + * + * {@code + * RpcEndpoint<GatewayTypeParameter extends RpcGateway> + * } + * + * Note, that the class hierarchy can also be nested. In this case the type argument + * always has to be the first argument, e.g. {@code + * + * // RpcClass needs to implement RpcGatewayClass' methods + * RpcClass extends RpcEndpoint<RpcGatewayClass> + * + * // RpcClass2 or its subclass needs to implement RpcGatewayClass' methods + * RpcClass<GatewayTypeParameter extends RpcGateway,...> extends RpcEndpoint<GatewayTypeParameter> + * RpcClass2 extends RpcClass<RpcGatewayClass,...> + * + * // needless to say, this can even be nested further + * ... + * } + * + */ public class RpcCompletenessTest extends TestLogger { + private static Logger LOG = LoggerFactory.getLogger(RpcCompletenessTest.class); + private static final Class<?> futureClass = Future.class; private static final Class<?> timeoutClass = Time.class; @@ -55,16 +85,52 @@ public class RpcCompletenessTest extends TestLogger { Class<? extends RpcEndpoint> c; - for (Class<? extends RpcEndpoint> rpcEndpoint :classes){ + mainloop: + for (Class<? extends RpcEndpoint> rpcEndpoint : classes) { c = rpcEndpoint; - Class<?> rpcGatewayType = ReflectionUtil.getTemplateType1(c); + LOG.debug("-------------"); + LOG.debug("c: {}", c); - if (rpcGatewayType != null) { - checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType); - } else { - fail("Could not retrieve the rpc gateway class for the given rpc endpoint class " + rpcEndpoint.getName()); + // skip abstract classes + if (Modifier.isAbstract(c.getModifiers())) { + LOG.debug("Skipping abstract class"); + continue; } + + // check for type parameter bound to RpcGateway + // skip if one is found because a subclass will provide the concrete argument + TypeVariable<? extends Class<? extends RpcEndpoint>>[] typeParameters = c.getTypeParameters(); + LOG.debug("Checking {} parameters.", typeParameters.length); + for (int i = 0; i < typeParameters.length; i++) { + for (Type bound : typeParameters[i].getBounds()) { + LOG.debug("checking bound {} of type parameter {}", bound, typeParameters[i]); + if (bound.toString().equals("interface " + RpcGateway.class.getName())) { + if (i > 0) { + fail("Type parameter for RpcGateway should come first in " + c); + } + LOG.debug("Skipping class with type parameter bound to RpcGateway."); + // Type parameter is bound to RpcGateway which a subclass will provide + continue mainloop; + } + } + } + + // check if this class or any super class contains the RpcGateway argument + Class<?> rpcGatewayType; + do { + LOG.debug("checking type argument of class: {}", c); + rpcGatewayType = ReflectionUtil.getTemplateType1(c); + LOG.debug("type argument is: {}", rpcGatewayType); + + c = (Class<? extends RpcEndpoint>) c.getSuperclass(); + + } while (!RpcGateway.class.isAssignableFrom(rpcGatewayType)); + + LOG.debug("Checking RRC completeness of endpoint '{}' with gateway '{}'", + rpcEndpoint.getSimpleName(), rpcGatewayType.getSimpleName()); + + checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType); } } @@ -352,7 +418,7 @@ public class RpcCompletenessTest extends TestLogger { */ private List<Method> getRpcMethodsFromGateway(Class<? extends RpcGateway> interfaceClass) { if(!interfaceClass.isInterface()) { - fail(interfaceClass.getName() + "is not a interface"); + fail(interfaceClass.getName() + " is not a interface"); } ArrayList<Method> allMethods = new ArrayList<>();