Resolve Registered Runners that don't end with Runner

Resolve runners in a case-insensitive manner.

This reduces duplication in specifying a runner

e.g. the DirectRunner can be specified with (among others) any of
"--runner=direct", "--runner=directrunner", "--runner=DirectRunner",
"--runner=Direct", or "--runner=directRunner"


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e601410b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e601410b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e601410b

Branch: refs/heads/master
Commit: e601410b6597022c637be27cee86ab080274017a
Parents: 4469479
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 12 13:07:52 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Tue Oct 18 10:52:59 2016 -0700

----------------------------------------------------------------------
 .../sdk/options/PipelineOptionsFactory.java     | 44 +++++++++++++-------
 .../sdk/options/PipelineOptionsFactoryTest.java | 23 +++++++---
 2 files changed, 48 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e601410b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index cd0c6b2..1c8a835 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -34,6 +34,7 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
@@ -446,6 +447,7 @@ public class PipelineOptionsFactory {
   private static final Class<?>[] EMPTY_CLASS_ARRAY = new Class[0];
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private static final ClassLoader CLASS_LOADER;
+
   private static final Map<String, Class<? extends PipelineRunner<?>>> 
SUPPORTED_PIPELINE_RUNNERS;
 
   /** Classes that are used as the boundary in the stack trace to find the 
callers class name. */
@@ -514,16 +516,20 @@ public class PipelineOptionsFactory {
 
     CLASS_LOADER = findClassLoader();
 
-    // Store the list of all available pipeline runners.
-    ImmutableMap.Builder<String, Class<? extends PipelineRunner<?>>> builder =
-            ImmutableMap.builder();
     Set<PipelineRunnerRegistrar> pipelineRunnerRegistrars =
         Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
     pipelineRunnerRegistrars.addAll(
         Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, 
CLASS_LOADER)));
+    // Store the list of all available pipeline runners.
+    ImmutableMap.Builder<String, Class<? extends PipelineRunner<?>>> builder =
+        ImmutableMap.builder();
     for (PipelineRunnerRegistrar registrar : pipelineRunnerRegistrars) {
       for (Class<? extends PipelineRunner<?>> klass : 
registrar.getPipelineRunners()) {
-        builder.put(klass.getSimpleName(), klass);
+        String runnerName = klass.getSimpleName().toLowerCase();
+        builder.put(runnerName, klass);
+        if (runnerName.endsWith("runner")) {
+          builder.put(runnerName.substring(0, runnerName.length() - 
"Runner".length()), klass);
+        }
       }
     }
     SUPPORTED_PIPELINE_RUNNERS = builder.build();
@@ -1420,24 +1426,25 @@ public class PipelineOptionsFactory {
         JavaType type = 
MAPPER.getTypeFactory().constructType(method.getGenericReturnType());
         if ("runner".equals(entry.getKey())) {
           String runner = Iterables.getOnlyElement(entry.getValue());
-          if (SUPPORTED_PIPELINE_RUNNERS.containsKey(runner)) {
-            convertedOptions.put("runner", 
SUPPORTED_PIPELINE_RUNNERS.get(runner));
+          if (SUPPORTED_PIPELINE_RUNNERS.containsKey(runner.toLowerCase())) {
+            convertedOptions.put("runner", 
SUPPORTED_PIPELINE_RUNNERS.get(runner.toLowerCase()));
           } else {
             try {
               Class<?> runnerClass = Class.forName(runner);
-              checkArgument(
-                  PipelineRunner.class.isAssignableFrom(runnerClass),
-                  "Class '%s' does not implement PipelineRunner. Supported 
pipeline runners %s",
-                  runner,
-                  Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
+              if (!(PipelineRunner.class.isAssignableFrom(runnerClass))) {
+                throw new IllegalArgumentException(
+                    String.format(
+                        "Class '%s' does not implement PipelineRunner. "
+                            + "Supported pipeline runners %s",
+                        runner, getSupportedRunners()));
+              }
               convertedOptions.put("runner", runnerClass);
             } catch (ClassNotFoundException e) {
               String msg =
                   String.format(
                       "Unknown 'runner' specified '%s', supported pipeline 
runners %s",
-                      runner,
-                      Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
-                throw new IllegalArgumentException(msg, e);
+                      runner, getSupportedRunners());
+              throw new IllegalArgumentException(msg, e);
             }
           }
         } else if ((returnType.isArray() && 
(SIMPLE_TYPES.contains(returnType.getComponentType())
@@ -1498,4 +1505,13 @@ public class PipelineOptionsFactory {
     }
     return convertedOptions;
   }
+
+  @VisibleForTesting
+  static Set<String> getSupportedRunners() {
+    ImmutableSortedSet.Builder<String> supportedRunners = 
ImmutableSortedSet.naturalOrder();
+    for (Class<? extends PipelineRunner<?>> runner : 
SUPPORTED_PIPELINE_RUNNERS.values()) {
+      supportedRunners.add(runner.getSimpleName());
+    }
+    return supportedRunners.build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e601410b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index a9ec7e4..e12699b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -73,7 +73,22 @@ public class PipelineOptionsFactoryTest {
   @Test
   public void testAutomaticRegistrationOfRunners() {
     assertEquals(REGISTERED_RUNNER,
-        
PipelineOptionsFactory.getRegisteredRunners().get(REGISTERED_RUNNER.getSimpleName()));
+        PipelineOptionsFactory.getRegisteredRunners()
+            .get(REGISTERED_RUNNER.getSimpleName().toLowerCase()));
+  }
+
+  @Test
+  public void testAutomaticRegistrationInculdesWithoutRunnerSuffix() {
+    // Sanity check to make sure the substring works appropriately
+    assertEquals("RegisteredTest",
+        REGISTERED_RUNNER.getSimpleName()
+            .substring(0, REGISTERED_RUNNER.getSimpleName().length() - 
"Runner".length()));
+    Map<String, Class<? extends PipelineRunner<?>>> registered =
+        PipelineOptionsFactory.getRegisteredRunners();
+    assertEquals(REGISTERED_RUNNER,
+        registered.get(REGISTERED_RUNNER.getSimpleName()
+            .toLowerCase()
+            .substring(0, REGISTERED_RUNNER.getSimpleName().length() - 
"Runner".length())));
   }
 
   @Test
@@ -927,10 +942,8 @@ public class PipelineOptionsFactoryTest {
     expectedException.expectMessage(
         "Unknown 'runner' specified 'UnknownRunner', supported " + "pipeline 
runners");
     Set<String> registeredRunners = 
PipelineOptionsFactory.getRegisteredRunners().keySet();
-    assertThat(registeredRunners, hasItem(REGISTERED_RUNNER.getSimpleName()));
-    for (String registeredRunner : registeredRunners) {
-      expectedException.expectMessage(registeredRunner);
-    }
+    assertThat(registeredRunners, 
hasItem(REGISTERED_RUNNER.getSimpleName().toLowerCase()));
+    
expectedException.expectMessage(PipelineOptionsFactory.getSupportedRunners().toString());
 
     PipelineOptionsFactory.fromArgs(args).create();
   }

Reply via email to