URNs for DirectRunner TransformEvaluator and RootInputProvider This makes two of the Java DirectRunner's registries key off URN instead of Java class. A root TransformEvaluator requires shards generated by its associated RootInputProvider, hence changing both at once.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e29cc52 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e29cc52 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e29cc52 Branch: refs/heads/master Commit: 0e29cc52a3e4b0d9ae5ff3907f10e4e87b734186 Parents: 663ad88 Author: Kenneth Knowles <k...@google.com> Authored: Fri May 19 20:53:32 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu May 25 11:17:08 2017 -0700 ---------------------------------------------------------------------- runners/direct-java/pom.xml | 5 + .../direct/BoundedReadEvaluatorFactory.java | 14 +- .../beam/runners/direct/DirectGroupByKey.java | 21 ++- .../beam/runners/direct/EmptyInputProvider.java | 8 +- .../direct/ParDoMultiOverrideFactory.java | 13 +- .../runners/direct/ReadEvaluatorFactory.java | 97 ++++++++++++++ .../beam/runners/direct/RootInputProvider.java | 7 +- .../runners/direct/RootProviderRegistry.java | 28 ++-- .../apache/beam/runners/direct/SourceShard.java | 33 +++++ .../direct/TestStreamEvaluatorFactory.java | 28 ++-- .../direct/TransformEvaluatorRegistry.java | 128 ++++++++++++++----- .../direct/UnboundedReadEvaluatorFactory.java | 31 +++-- .../runners/direct/ViewOverrideFactory.java | 12 +- .../src/main/resources/beam/findbugs-filter.xml | 7 + 14 files changed, 344 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index bec2113..cba4b09 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -208,6 +208,11 @@ </dependency> <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + + <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 76db861..fcaaa84 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -33,10 +33,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.direct.StepTransformResult.Builder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -180,16 +180,17 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { } @AutoValue - abstract static class BoundedSourceShard<T> { + abstract static class BoundedSourceShard<T> implements SourceShard<T> { static <T> BoundedSourceShard<T> of(BoundedSource<T> source) { return new AutoValue_BoundedReadEvaluatorFactory_BoundedSourceShard<>(source); } - abstract BoundedSource<T> getSource(); + @Override + public abstract BoundedSource<T> getSource(); } static class InputProvider<T> - implements RootInputProvider<T, BoundedSourceShard<T>, PBegin, Read.Bounded<T>> { + implements RootInputProvider<T, BoundedSourceShard<T>, PBegin> { private final EvaluationContext evaluationContext; InputProvider(EvaluationContext evaluationContext) { @@ -198,9 +199,10 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { @Override public Collection<CommittedBundle<BoundedSourceShard<T>>> getInitialInputs( - AppliedPTransform<PBegin, PCollection<T>, Read.Bounded<T>> transform, int targetParallelism) + AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform, + int targetParallelism) throws Exception { - BoundedSource<T> source = transform.getTransform().getSource(); + BoundedSource<T> source = ReadTranslation.boundedSourceFromTransform(transform); PipelineOptions options = evaluationContext.getPipelineOptions(); long estimatedBytes = source.getEstimatedSizeBytes(options); long bytesPerBundle = estimatedBytes / targetParallelism; http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java index 791615a..f239070 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java @@ -20,9 +20,11 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkArgument; +import com.google.protobuf.Message; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.construction.ForwardingPTransform; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -37,6 +39,9 @@ class DirectGroupByKey<K, V> extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { private final GroupByKey<K, V> original; + static final String DIRECT_GBKO_URN = "urn:beam:directrunner:transforms:gbko:v1"; + static final String DIRECT_GABW_URN = "urn:beam:directrunner:transforms:gabw:v1"; + DirectGroupByKey(GroupByKey<K, V> from) { this.original = from; } @@ -68,7 +73,8 @@ class DirectGroupByKey<K, V> } static final class DirectGroupByKeyOnly<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> { + extends PTransformTranslation.RawPTransform< + PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, Message> { @Override public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) { return PCollection.createPrimitiveOutputInternal( @@ -86,10 +92,16 @@ class DirectGroupByKey<K, V> GroupByKey.getInputValueCoder(input.getCoder()), input.getWindowingStrategy().getWindowFn().windowCoder()); } + + @Override + public String getUrn() { + return DIRECT_GBKO_URN; + } } static final class DirectGroupAlsoByWindow<K, V> - extends PTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> { + extends PTransformTranslation.RawPTransform< + PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, Message> { private final WindowingStrategy<?, ?> inputWindowingStrategy; private final WindowingStrategy<?, ?> outputWindowingStrategy; @@ -135,5 +147,10 @@ class DirectGroupByKey<K, V> return PCollection.createPrimitiveOutputInternal( input.getPipeline(), outputWindowingStrategy, input.isBounded()); } + + @Override + public String getUrn() { + return DIRECT_GABW_URN; + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java index c36879a..a5a53bc 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java @@ -20,13 +20,12 @@ package org.apache.beam.runners.direct; import java.util.Collection; import java.util.Collections; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; /** A {@link RootInputProvider} that provides a singleton empty bundle. */ -class EmptyInputProvider<T> - implements RootInputProvider<T, Void, PCollectionList<T>, Flatten.PCollections<T>> { +class EmptyInputProvider<T> implements RootInputProvider<T, Void, PCollectionList<T>> { EmptyInputProvider() {} /** @@ -36,7 +35,8 @@ class EmptyInputProvider<T> */ @Override public Collection<CommittedBundle<Void>> getInitialInputs( - AppliedPTransform<PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>> + AppliedPTransform< + PCollectionList<T>, PCollection<T>, PTransform<PCollectionList<T>, PCollection<T>>> transform, int targetParallelism) { return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index be433db..df2054b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -19,11 +19,13 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; +import com.google.protobuf.Message; import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.construction.PTransformReplacements; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -165,8 +167,12 @@ class ParDoMultiOverrideFactory<InputT, OutputT> } } + static final String DIRECT_STATEFUL_PAR_DO_URN = + "urn:beam:directrunner:transforms:stateful_pardo:v1"; + static class StatefulParDo<K, InputT, OutputT> - extends PTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> { + extends PTransformTranslation.RawPTransform< + PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, Message> { private final transient MultiOutput<KV<K, InputT>, OutputT> underlyingParDo; private final transient PCollection<KV<K, InputT>> originalInput; @@ -201,6 +207,11 @@ class ParDoMultiOverrideFactory<InputT, OutputT> return outputs; } + + @Override + public String getUrn() { + return DIRECT_STATEFUL_PAR_DO_URN; + } } /** http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ReadEvaluatorFactory.java new file mode 100644 index 0000000..8521706 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ReadEvaluatorFactory.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import java.util.Collection; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.ReadTranslation; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; + +/** + * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} + * for the {@link Read Read} primitives, whether bounded or unbounded. + */ +final class ReadEvaluatorFactory implements TransformEvaluatorFactory { + + final BoundedReadEvaluatorFactory boundedFactory; + final UnboundedReadEvaluatorFactory unboundedFactory; + + public ReadEvaluatorFactory(EvaluationContext context) { + boundedFactory = new BoundedReadEvaluatorFactory(context); + unboundedFactory = new UnboundedReadEvaluatorFactory(context); + } + + @Nullable + @Override + public <InputT> TransformEvaluator<InputT> forApplication( + AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception { + switch (ReadTranslation.sourceIsBounded(application)) { + case BOUNDED: + return boundedFactory.forApplication(application, inputBundle); + case UNBOUNDED: + return unboundedFactory.forApplication(application, inputBundle); + default: + throw new IllegalArgumentException("PCollection is neither bounded nor unbounded?!?"); + } + } + + @Override + public void cleanup() throws Exception { + boundedFactory.cleanup(); + unboundedFactory.cleanup(); + } + + static <T> InputProvider<T> inputProvider(EvaluationContext context) { + return new InputProvider(context); + } + + private static class InputProvider<T> implements RootInputProvider<T, SourceShard<T>, PBegin> { + + private final UnboundedReadEvaluatorFactory.InputProvider<T> unboundedInputProvider; + private final BoundedReadEvaluatorFactory.InputProvider<T> boundedInputProvider; + + InputProvider(EvaluationContext context) { + this.unboundedInputProvider = new UnboundedReadEvaluatorFactory.InputProvider<T>(context); + this.boundedInputProvider = new BoundedReadEvaluatorFactory.InputProvider<T>(context); + } + + @Override + public Collection<CommittedBundle<SourceShard<T>>> getInitialInputs( + AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> + appliedTransform, + int targetParallelism) + throws Exception { + switch (ReadTranslation.sourceIsBounded(appliedTransform)) { + case BOUNDED: + // This cast could be made unnecessary, but too much bounded polymorphism + return (Collection) + boundedInputProvider.getInitialInputs(appliedTransform, targetParallelism); + case UNBOUNDED: + // This cast could be made unnecessary, but too much bounded polymorphism + return (Collection) + unboundedInputProvider.getInitialInputs(appliedTransform, targetParallelism); + default: + throw new IllegalArgumentException("PCollection is neither bounded nor unbounded?!?"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java index ce69518..0b3de32 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java @@ -29,8 +29,7 @@ import org.apache.beam.sdk.values.PInput; * Provides {@link CommittedBundle bundles} that will be provided to the {@link PTransform * PTransforms} that are at the root of a {@link Pipeline}. */ -interface RootInputProvider< - T, ShardT, InputT extends PInput, TransformT extends PTransform<InputT, PCollection<T>>> { +interface RootInputProvider<T, ShardT, InputT extends PInput> { /** * Get the initial inputs for the {@link AppliedPTransform}. The {@link AppliedPTransform} will be * provided with these {@link CommittedBundle bundles} as input when the {@link Pipeline} runs. @@ -44,6 +43,8 @@ interface RootInputProvider< * greater than or equal to 1. */ Collection<CommittedBundle<ShardT>> getInitialInputs( - AppliedPTransform<InputT, PCollection<T>, TransformT> transform, int targetParallelism) + AppliedPTransform<InputT, PCollection<T>, PTransform<InputT, PCollection<T>>> + transform, + int targetParallelism) throws Exception; } http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java index 4b0c06d..5cbeab7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java @@ -18,13 +18,14 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.runners.core.construction.PTransformTranslation.FLATTEN_TRANSFORM_URN; +import static org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DIRECT_TEST_STREAM_URN; import com.google.common.collect.ImmutableMap; import java.util.Collection; import java.util.Map; -import org.apache.beam.sdk.io.Read; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.Flatten.PCollections; import org.apache.beam.sdk.transforms.PTransform; /** @@ -33,34 +34,31 @@ import org.apache.beam.sdk.transforms.PTransform; */ class RootProviderRegistry { public static RootProviderRegistry defaultRegistry(EvaluationContext context) { - ImmutableMap.Builder<Class<? extends PTransform>, RootInputProvider<?, ?, ?, ?>> + ImmutableMap.Builder<String, RootInputProvider<?, ?, ?>> defaultProviders = ImmutableMap.builder(); defaultProviders - .put(Read.Bounded.class, new BoundedReadEvaluatorFactory.InputProvider(context)) - .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory.InputProvider(context)) - .put( - TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class, - new TestStreamEvaluatorFactory.InputProvider(context)) - .put(PCollections.class, new EmptyInputProvider()); + .put(PTransformTranslation.READ_TRANSFORM_URN, ReadEvaluatorFactory.inputProvider(context)) + .put(DIRECT_TEST_STREAM_URN, new TestStreamEvaluatorFactory.InputProvider(context)) + .put(FLATTEN_TRANSFORM_URN, new EmptyInputProvider()); return new RootProviderRegistry(defaultProviders.build()); } - private final Map<Class<? extends PTransform>, RootInputProvider<?, ?, ?, ?>> providers; + private final Map<String, RootInputProvider<?, ?, ?>> providers; private RootProviderRegistry( - Map<Class<? extends PTransform>, RootInputProvider<?, ?, ?, ?>> providers) { + Map<String, RootInputProvider<?, ?, ?>> providers) { this.providers = providers; } public Collection<CommittedBundle<?>> getInitialInputs( AppliedPTransform<?, ?, ?> transform, int targetParallelism) throws Exception { - Class<? extends PTransform> transformClass = transform.getTransform().getClass(); + String transformUrn = PTransformTranslation.urnForTransform(transform.getTransform()); RootInputProvider provider = checkNotNull( - providers.get(transformClass), - "Tried to get a %s for a Transform of type %s, but there is no such provider", + providers.get(transformUrn), + "Tried to get a %s for a transform \"%s\", but there is no such provider", RootInputProvider.class.getSimpleName(), - transformClass.getSimpleName()); + transformUrn); return provider.getInitialInputs(transform, targetParallelism); } } http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java new file mode 100644 index 0000000..a054333 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; + +/** + * A shard for a source in the {@link Read} transform. + * + * <p>Since {@link UnboundedSource} and {@link BoundedSource} have radically different needs, this + * is a mostly-empty interface. + */ +interface SourceShard<T> { + Source<T> getSource(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 8b21d5a..b1db58f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -22,12 +22,14 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.common.collect.Iterables; +import com.google.protobuf.Message; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -180,7 +182,10 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { return ReplacementOutputs.singleton(outputs, newOutput); } - static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> { + static final String DIRECT_TEST_STREAM_URN = "urn:beam:directrunner:transforms:test_stream:v1"; + + static class DirectTestStream<T> + extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>, Message> { private final transient DirectRunner runner; private final TestStream<T> original; @@ -197,12 +202,15 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) .setCoder(original.getValueCoder()); } + + @Override + public String getUrn() { + return DIRECT_TEST_STREAM_URN; + } } } - static class InputProvider<T> - implements RootInputProvider< - T, TestStreamIndex<T>, PBegin, DirectTestStreamFactory.DirectTestStream<T>> { + static class InputProvider<T> implements RootInputProvider<T, TestStreamIndex<T>, PBegin> { private final EvaluationContext evaluationContext; InputProvider(EvaluationContext evaluationContext) { @@ -211,15 +219,17 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { @Override public Collection<CommittedBundle<TestStreamIndex<T>>> getInitialInputs( - AppliedPTransform<PBegin, PCollection<T>, DirectTestStreamFactory.DirectTestStream<T>> - transform, + AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform, int targetParallelism) { + + // This will always be run on an execution-time transform, so it can be downcast + DirectTestStreamFactory.DirectTestStream<T> testStream = + (DirectTestStreamFactory.DirectTestStream<T>) transform.getTransform(); + CommittedBundle<TestStreamIndex<T>> initialBundle = evaluationContext .<TestStreamIndex<T>>createRootBundle() - .add( - WindowedValue.valueInGlobalWindow( - TestStreamIndex.of(transform.getTransform().original))) + .add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(testStream.original))) .commit(BoundedWindow.TIMESTAMP_MAX_VALUE); return Collections.singleton(initialBundle); } http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 718cca2..d144b20 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -19,23 +19,33 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.runners.core.construction.PTransformTranslation.FLATTEN_TRANSFORM_URN; +import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN; +import static org.apache.beam.runners.core.construction.PTransformTranslation.READ_TRANSFORM_URN; +import static org.apache.beam.runners.core.construction.PTransformTranslation.WINDOW_TRANSFORM_URN; +import static org.apache.beam.runners.core.construction.SplittableParDo.SPLITTABLE_PROCESS_URN; +import static org.apache.beam.runners.direct.DirectGroupByKey.DIRECT_GABW_URN; +import static org.apache.beam.runners.direct.DirectGroupByKey.DIRECT_GBKO_URN; +import static org.apache.beam.runners.direct.ParDoMultiOverrideFactory.DIRECT_STATEFUL_PAR_DO_URN; +import static org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DIRECT_TEST_STREAM_URN; +import static org.apache.beam.runners.direct.ViewOverrideFactory.DIRECT_WRITE_VIEW_URN; +import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; -import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; -import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; -import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo; -import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView; -import org.apache.beam.sdk.io.Read; +import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.Flatten.PCollections; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.Window; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,43 +55,93 @@ import org.slf4j.LoggerFactory; */ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class); + public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) { - @SuppressWarnings({"rawtypes"}) - ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives = - ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder() - .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt)) - .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt)) + ImmutableMap<String, TransformEvaluatorFactory> primitives = + ImmutableMap.<String, TransformEvaluatorFactory>builder() + // Beam primitives + .put(READ_TRANSFORM_URN, new ReadEvaluatorFactory(ctxt)) .put( - ParDo.MultiOutput.class, + PAR_DO_TRANSFORM_URN, new ParDoEvaluatorFactory<>(ctxt, ParDoEvaluator.defaultRunnerFactory())) - .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt)) - .put(PCollections.class, new FlattenEvaluatorFactory(ctxt)) - .put(WriteView.class, new ViewEvaluatorFactory(ctxt)) - .put(Window.Assign.class, new WindowEvaluatorFactory(ctxt)) - // Runner-specific primitives used in expansion of GroupByKey - .put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory(ctxt)) - .put(DirectGroupAlsoByWindow.class, new GroupAlsoByWindowEvaluatorFactory(ctxt)) - .put( - TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class, - new TestStreamEvaluatorFactory(ctxt)) - // Runner-specific primitive used in expansion of SplittableParDo - .put( - SplittableParDoViaKeyedWorkItems.ProcessElements.class, - new SplittableProcessElementsEvaluatorFactory<>(ctxt)) + .put(FLATTEN_TRANSFORM_URN, new FlattenEvaluatorFactory(ctxt)) + .put(WINDOW_TRANSFORM_URN, new WindowEvaluatorFactory(ctxt)) + + // Runner-specific primitives + .put(DIRECT_WRITE_VIEW_URN, new ViewEvaluatorFactory(ctxt)) + .put(DIRECT_STATEFUL_PAR_DO_URN, new StatefulParDoEvaluatorFactory<>(ctxt)) + .put(DIRECT_GBKO_URN, new GroupByKeyOnlyEvaluatorFactory(ctxt)) + .put(DIRECT_GABW_URN, new GroupAlsoByWindowEvaluatorFactory(ctxt)) + .put(DIRECT_TEST_STREAM_URN, new TestStreamEvaluatorFactory(ctxt)) + + // Runners-core primitives + .put(SPLITTABLE_PROCESS_URN, new SplittableProcessElementsEvaluatorFactory<>(ctxt)) .build(); return new TransformEvaluatorRegistry(primitives); } + /** Registers classes specialized to the direct runner. */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class DirectTransformsRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map< + ? extends Class<? extends PTransform>, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + .<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder() + .put( + DirectGroupByKey.DirectGroupByKeyOnly.class, + new PTransformTranslation.RawPTransformTranslator<>()) + .put( + DirectGroupByKey.DirectGroupAlsoByWindow.class, + new PTransformTranslation.RawPTransformTranslator()) + .put( + ParDoMultiOverrideFactory.StatefulParDo.class, + new PTransformTranslation.RawPTransformTranslator<>()) + .put( + ViewOverrideFactory.WriteView.class, + new PTransformTranslation.RawPTransformTranslator<>()) + .put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator<>()) + .put( + SplittableParDoViaKeyedWorkItems.ProcessElements.class, + new SplittableParDoProcessElementsTranslator()) + .build(); + } + } + + /** + * A translator just to vend the URN. This will need to be moved to runners-core-construction-java + * once SDF is reorganized appropriately. + */ + private static class SplittableParDoProcessElementsTranslator + implements TransformPayloadTranslator<ProcessElements<?, ?, ?, ?>> { + + private SplittableParDoProcessElementsTranslator() {} + + @Override + public String getUrn(ProcessElements<?, ?, ?, ?> transform) { + return SPLITTABLE_PROCESS_URN; + } + + @Override + public FunctionSpec translate( + AppliedPTransform<?, ?, ProcessElements<?, ?, ?, ?>> transform, SdkComponents components) { + throw new UnsupportedOperationException( + String.format("%s should never be translated", + ProcessElements.class.getCanonicalName())); + } + } + // the TransformEvaluatorFactories can construct instances of all generic types of transform, // so all instances of a primitive can be handled with the same evaluator factory. - @SuppressWarnings("rawtypes") - private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories; + private final Map<String, TransformEvaluatorFactory> factories; private final AtomicBoolean finished = new AtomicBoolean(false); private TransformEvaluatorRegistry( @SuppressWarnings("rawtypes") - Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) { + Map<String, TransformEvaluatorFactory> factories) { this.factories = factories; } @@ -91,10 +151,12 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { throws Exception { checkState( !finished.get(), "Tried to get an evaluator for a finished TransformEvaluatorRegistry"); - Class<? extends PTransform> transformClass = application.getTransform().getClass(); + + String urn = PTransformTranslation.urnForTransform(application.getTransform()); + TransformEvaluatorFactory factory = checkNotNull( - factories.get(transformClass), "No evaluator for PTransform type %s", transformClass); + factories.get(urn), "No evaluator for PTransform \"%s\"", urn); return factory.forApplication(application, inputBundle); } http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index cba826c..7d4bba1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Read.Unbounded; @@ -253,7 +254,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { } @AutoValue - abstract static class UnboundedSourceShard<T, CheckpointT extends CheckpointMark> { + abstract static class UnboundedSourceShard<T, CheckpointT extends CheckpointMark> + implements SourceShard<T> { static <T, CheckpointT extends CheckpointMark> UnboundedSourceShard<T, CheckpointT> unstarted( UnboundedSource<T, CheckpointT> source, UnboundedReadDeduplicator deduplicator) { return of(source, deduplicator, null, null); @@ -268,7 +270,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { source, deduplicator, reader, checkpoint); } - abstract UnboundedSource<T, CheckpointT> getSource(); + @Override + public abstract UnboundedSource<T, CheckpointT> getSource(); abstract UnboundedReadDeduplicator getDeduplicator(); @@ -283,9 +286,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { } } - static class InputProvider<OutputT> - implements RootInputProvider< - OutputT, UnboundedSourceShard<OutputT, ?>, PBegin, Unbounded<OutputT>> { + static class InputProvider<T> + implements RootInputProvider<T, UnboundedSourceShard<T, ?>, PBegin> { private final EvaluationContext evaluationContext; InputProvider(EvaluationContext evaluationContext) { @@ -293,27 +295,28 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public Collection<CommittedBundle<UnboundedSourceShard<OutputT, ?>>> getInitialInputs( - AppliedPTransform<PBegin, PCollection<OutputT>, Unbounded<OutputT>> transform, + public Collection<CommittedBundle<UnboundedSourceShard<T, ?>>> getInitialInputs( + AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> + transform, int targetParallelism) throws Exception { - UnboundedSource<OutputT, ?> source = transform.getTransform().getSource(); - List<? extends UnboundedSource<OutputT, ?>> splits = + UnboundedSource<T, ?> source = ReadTranslation.unboundedSourceFromTransform(transform); + List<? extends UnboundedSource<T, ?>> splits = source.split(targetParallelism, evaluationContext.getPipelineOptions()); UnboundedReadDeduplicator deduplicator = source.requiresDeduping() ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() : NeverDeduplicator.create(); - ImmutableList.Builder<CommittedBundle<UnboundedSourceShard<OutputT, ?>>> initialShards = + ImmutableList.Builder<CommittedBundle<UnboundedSourceShard<T, ?>>> initialShards = ImmutableList.builder(); - for (UnboundedSource<OutputT, ?> split : splits) { - UnboundedSourceShard<OutputT, ?> shard = + for (UnboundedSource<T, ?> split : splits) { + UnboundedSourceShard<T, ?> shard = UnboundedSourceShard.unstarted(split, deduplicator); initialShards.add( evaluationContext - .<UnboundedSourceShard<OutputT, ?>>createRootBundle() - .add(WindowedValue.<UnboundedSourceShard<OutputT, ?>>valueInGlobalWindow(shard)) + .<UnboundedSourceShard<T, ?>>createRootBundle() + .add(WindowedValue.<UnboundedSourceShard<T, ?>>valueInGlobalWindow(shard)) .commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); } return initialShards.build(); http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java index b3bbac8..501b436 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java @@ -18,10 +18,12 @@ package org.apache.beam.runners.direct; +import com.google.protobuf.Message; import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.construction.ForwardingPTransform; import org.apache.beam.runners.core.construction.PTransformReplacements; +import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -93,7 +95,7 @@ class ViewOverrideFactory<ElemT, ViewT> * to {@link ViewT}. */ static final class WriteView<ElemT, ViewT> - extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> { + extends RawPTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>, Message> { private final CreatePCollectionView<ElemT, ViewT> og; WriteView(CreatePCollectionView<ElemT, ViewT> og) { @@ -110,5 +112,13 @@ class ViewOverrideFactory<ElemT, ViewT> public PCollectionView<ViewT> getView() { return og.getView(); } + + @Override + public String getUrn() { + return DIRECT_WRITE_VIEW_URN; + } } + + public static final String DIRECT_WRITE_VIEW_URN = + "urn:beam:directrunner:transforms:write_view:v1"; } http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 8ff0cb0..3430750 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -405,4 +405,11 @@ <Bug pattern="NM_CLASS_NOT_EXCEPTION"/> <!-- It is clear from the name that this class holds either StorageObject or IOException. --> </Match> + + <Match> + <Class name="org.apache.beam.runners.direct.ParDoMultiOverrideFactory$StatefulParDo"/> + <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> + <!-- PTransforms do not actually support serialization. --> + </Match> + </FindBugsFilter>