Resolve Registered Runners that don't end with Runner Resolve runners in a case-insensitive manner.
This reduces duplication in specifying a runner e.g. the DirectRunner can be specified with (among others) any of "--runner=direct", "--runner=directrunner", "--runner=DirectRunner", "--runner=Direct", or "--runner=directRunner" Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e601410b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e601410b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e601410b Branch: refs/heads/master Commit: e601410b6597022c637be27cee86ab080274017a Parents: 4469479 Author: Thomas Groh <tg...@google.com> Authored: Wed Oct 12 13:07:52 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue Oct 18 10:52:59 2016 -0700 ---------------------------------------------------------------------- .../sdk/options/PipelineOptionsFactory.java | 44 +++++++++++++------- .../sdk/options/PipelineOptionsFactoryTest.java | 23 +++++++--- 2 files changed, 48 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e601410b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index cd0c6b2..1c8a835 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -34,6 +34,7 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.ListMultimap; @@ -446,6 +447,7 @@ public class PipelineOptionsFactory { private static final Class<?>[] EMPTY_CLASS_ARRAY = new Class[0]; private static final ObjectMapper MAPPER = new ObjectMapper(); private static final ClassLoader CLASS_LOADER; + private static final Map<String, Class<? extends PipelineRunner<?>>> SUPPORTED_PIPELINE_RUNNERS; /** Classes that are used as the boundary in the stack trace to find the callers class name. */ @@ -514,16 +516,20 @@ public class PipelineOptionsFactory { CLASS_LOADER = findClassLoader(); - // Store the list of all available pipeline runners. - ImmutableMap.Builder<String, Class<? extends PipelineRunner<?>>> builder = - ImmutableMap.builder(); Set<PipelineRunnerRegistrar> pipelineRunnerRegistrars = Sets.newTreeSet(ObjectsClassComparator.INSTANCE); pipelineRunnerRegistrars.addAll( Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, CLASS_LOADER))); + // Store the list of all available pipeline runners. + ImmutableMap.Builder<String, Class<? extends PipelineRunner<?>>> builder = + ImmutableMap.builder(); for (PipelineRunnerRegistrar registrar : pipelineRunnerRegistrars) { for (Class<? extends PipelineRunner<?>> klass : registrar.getPipelineRunners()) { - builder.put(klass.getSimpleName(), klass); + String runnerName = klass.getSimpleName().toLowerCase(); + builder.put(runnerName, klass); + if (runnerName.endsWith("runner")) { + builder.put(runnerName.substring(0, runnerName.length() - "Runner".length()), klass); + } } } SUPPORTED_PIPELINE_RUNNERS = builder.build(); @@ -1420,24 +1426,25 @@ public class PipelineOptionsFactory { JavaType type = MAPPER.getTypeFactory().constructType(method.getGenericReturnType()); if ("runner".equals(entry.getKey())) { String runner = Iterables.getOnlyElement(entry.getValue()); - if (SUPPORTED_PIPELINE_RUNNERS.containsKey(runner)) { - convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner)); + if (SUPPORTED_PIPELINE_RUNNERS.containsKey(runner.toLowerCase())) { + convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner.toLowerCase())); } else { try { Class<?> runnerClass = Class.forName(runner); - checkArgument( - PipelineRunner.class.isAssignableFrom(runnerClass), - "Class '%s' does not implement PipelineRunner. Supported pipeline runners %s", - runner, - Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet())); + if (!(PipelineRunner.class.isAssignableFrom(runnerClass))) { + throw new IllegalArgumentException( + String.format( + "Class '%s' does not implement PipelineRunner. " + + "Supported pipeline runners %s", + runner, getSupportedRunners())); + } convertedOptions.put("runner", runnerClass); } catch (ClassNotFoundException e) { String msg = String.format( "Unknown 'runner' specified '%s', supported pipeline runners %s", - runner, - Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet())); - throw new IllegalArgumentException(msg, e); + runner, getSupportedRunners()); + throw new IllegalArgumentException(msg, e); } } } else if ((returnType.isArray() && (SIMPLE_TYPES.contains(returnType.getComponentType()) @@ -1498,4 +1505,13 @@ public class PipelineOptionsFactory { } return convertedOptions; } + + @VisibleForTesting + static Set<String> getSupportedRunners() { + ImmutableSortedSet.Builder<String> supportedRunners = ImmutableSortedSet.naturalOrder(); + for (Class<? extends PipelineRunner<?>> runner : SUPPORTED_PIPELINE_RUNNERS.values()) { + supportedRunners.add(runner.getSimpleName()); + } + return supportedRunners.build(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e601410b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index a9ec7e4..e12699b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -73,7 +73,22 @@ public class PipelineOptionsFactoryTest { @Test public void testAutomaticRegistrationOfRunners() { assertEquals(REGISTERED_RUNNER, - PipelineOptionsFactory.getRegisteredRunners().get(REGISTERED_RUNNER.getSimpleName())); + PipelineOptionsFactory.getRegisteredRunners() + .get(REGISTERED_RUNNER.getSimpleName().toLowerCase())); + } + + @Test + public void testAutomaticRegistrationInculdesWithoutRunnerSuffix() { + // Sanity check to make sure the substring works appropriately + assertEquals("RegisteredTest", + REGISTERED_RUNNER.getSimpleName() + .substring(0, REGISTERED_RUNNER.getSimpleName().length() - "Runner".length())); + Map<String, Class<? extends PipelineRunner<?>>> registered = + PipelineOptionsFactory.getRegisteredRunners(); + assertEquals(REGISTERED_RUNNER, + registered.get(REGISTERED_RUNNER.getSimpleName() + .toLowerCase() + .substring(0, REGISTERED_RUNNER.getSimpleName().length() - "Runner".length()))); } @Test @@ -927,10 +942,8 @@ public class PipelineOptionsFactoryTest { expectedException.expectMessage( "Unknown 'runner' specified 'UnknownRunner', supported " + "pipeline runners"); Set<String> registeredRunners = PipelineOptionsFactory.getRegisteredRunners().keySet(); - assertThat(registeredRunners, hasItem(REGISTERED_RUNNER.getSimpleName())); - for (String registeredRunner : registeredRunners) { - expectedException.expectMessage(registeredRunner); - } + assertThat(registeredRunners, hasItem(REGISTERED_RUNNER.getSimpleName().toLowerCase())); + expectedException.expectMessage(PipelineOptionsFactory.getSupportedRunners().toString()); PipelineOptionsFactory.fromArgs(args).create(); }