[BEAM-65] SplittableDoFn prototype. Work in progress. Currently only runs in direct runner, and not ready for any use by real users.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0a24883 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0a24883 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0a24883 Branch: refs/heads/master Commit: a0a24883737850052f54290255780e868c0b63dc Parents: a5d1293 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Thu Aug 11 17:13:53 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Wed Oct 12 17:29:20 2016 -0700 ---------------------------------------------------------------------- runners/core-java/pom.xml | 6 + .../runners/core/ElementAndRestriction.java | 42 ++ .../core/ElementAndRestrictionCoder.java | 67 +++ .../runners/core/GBKIntoKeyedWorkItems.java | 40 ++ .../beam/runners/core/SplittableParDo.java | 469 ++++++++++++++++ .../core/ElementAndRestrictionCoderTest.java | 127 +++++ .../beam/runners/core/SplittableParDoTest.java | 467 ++++++++++++++++ ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 66 +++ .../beam/runners/direct/DirectRunner.java | 5 + .../runners/direct/ParDoOverrideFactory.java | 55 ++ .../beam/runners/direct/SplittableDoFnTest.java | 225 ++++++++ .../beam/sdk/annotations/Experimental.java | 8 +- .../org/apache/beam/sdk/transforms/DoFn.java | 218 +++++++- .../beam/sdk/transforms/DoFnAdapters.java | 27 +- .../apache/beam/sdk/transforms/DoFnTester.java | 117 ++-- .../org/apache/beam/sdk/transforms/ParDo.java | 19 + .../sdk/transforms/reflect/DoFnInvoker.java | 26 +- .../sdk/transforms/reflect/DoFnInvokers.java | 179 +++++- .../sdk/transforms/reflect/DoFnSignature.java | 114 +++- .../sdk/transforms/reflect/DoFnSignatures.java | 366 ++++++++++++- .../splittabledofn/RestrictionTracker.java | 42 ++ .../transforms/splittabledofn/package-info.java | 22 + .../org/apache/beam/sdk/coders/KvCoderTest.java | 99 ++-- .../apache/beam/sdk/transforms/ParDoTest.java | 46 ++ .../transforms/reflect/DoFnInvokersTest.java | 261 ++++++++- .../DoFnSignaturesProcessElementTest.java | 4 +- .../DoFnSignaturesSplittableDoFnTest.java | 543 +++++++++++++++++++ 27 files changed, 3495 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index d958dd2..d84c420 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -190,6 +190,12 @@ </dependency> <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>annotations</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java new file mode 100644 index 0000000..4a5d0c4 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * A tuple of an element and a restriction applied to processing it with a + * <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}. + */ +@Experimental(Experimental.Kind.SPLITTABLE_DO_FN) +@AutoValue +public abstract class ElementAndRestriction<ElementT, RestrictionT> { + /** The element to process. */ + public abstract ElementT element(); + + /** The restriction applied to processing the element. */ + public abstract RestrictionT restriction(); + + /** Constructs the {@link ElementAndRestriction}. */ + public static <InputT, RestrictionT> ElementAndRestriction<InputT, RestrictionT> of( + InputT element, RestrictionT restriction) { + return new AutoValue_ElementAndRestriction<>(element, restriction); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java new file mode 100644 index 0000000..6dec8e2 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; + +/** A {@link Coder} for {@link ElementAndRestriction}. */ +@Experimental(Experimental.Kind.SPLITTABLE_DO_FN) +public class ElementAndRestrictionCoder<ElementT, RestrictionT> + extends CustomCoder<ElementAndRestriction<ElementT, RestrictionT>> { + private final Coder<ElementT> elementCoder; + private final Coder<RestrictionT> restrictionCoder; + + /** + * Creates an {@link ElementAndRestrictionCoder} from an element coder and a restriction coder. + */ + public static <ElementT, RestrictionT> ElementAndRestrictionCoder<ElementT, RestrictionT> of( + Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) { + return new ElementAndRestrictionCoder<>(elementCoder, restrictionCoder); + } + + private ElementAndRestrictionCoder( + Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) { + this.elementCoder = elementCoder; + this.restrictionCoder = restrictionCoder; + } + + @Override + public void encode( + ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream, Context context) + throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null ElementAndRestriction"); + } + elementCoder.encode(value.element(), outStream, context.nested()); + restrictionCoder.encode(value.restriction(), outStream, context); + } + + @Override + public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context) + throws IOException { + ElementT key = elementCoder.decode(inStream, context.nested()); + RestrictionT value = restrictionCoder.decode(inStream, context); + return ElementAndRestriction.of(key, value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java new file mode 100644 index 0000000..ca4d681 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * Interface for creating a runner-specific {@link GroupByKey GroupByKey-like} {@link PTransform} + * that produces {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state + * and timers. + */ +@Experimental(Experimental.Kind.SPLITTABLE_DO_FN) +public class GBKIntoKeyedWorkItems<KeyT, InputT> + extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> { + @Override + public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java new file mode 100644 index 0000000..7645149 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import java.util.UUID; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * A utility transform that executes a <a + * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} by expanding it into a + * network of simpler transforms: + * + * <ol> + * <li>Pair each element with an initial restriction + * <li>Split each restriction into sub-restrictions + * <li>Assign a unique key to each element/restriction pair + * <li>Group by key (so that work is partitioned by key and we can access state/timers) + * <li>Process each keyed element/restriction pair with the splittable {@link DoFn}'s {@link + * DoFn.ProcessElement} method, using state and timers API. + * </ol> + * + * <p>This transform is intended as a helper for internal use by runners when implementing {@code + * ParDo.of(splittable DoFn)}, but not for direct use by pipeline writers. + */ +@Experimental(Experimental.Kind.SPLITTABLE_DO_FN) +public class SplittableParDo< + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> + extends PTransform<PCollection<InputT>, PCollection<OutputT>> { + private final DoFn<InputT, OutputT> fn; + private final DoFnSignature signature; + + /** + * Creates the transform for the given original {@link ParDo} and {@link DoFn}. + * + * @param fn The splittable {@link DoFn} inside the original {@link ParDo} transform. + */ + public SplittableParDo(DoFn<InputT, OutputT> fn) { + checkNotNull(fn, "fn must not be null"); + this.fn = fn; + this.signature = DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass()); + checkArgument(signature.processElement().isSplittable(), "fn must be a splittable DoFn"); + } + + @Override + public PCollection<OutputT> apply(PCollection<InputT> input) { + PCollection.IsBounded isFnBounded = signature.isBoundedPerElement(); + Coder<RestrictionT> restrictionCoder = + DoFnInvokers.INSTANCE + .newByteBuddyInvoker(fn) + .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry()); + Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder = + ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder); + + PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> keyedWorkItems = + input + .apply( + "Pair with initial restriction", + ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(fn))) + .setCoder(splitCoder) + .apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn))) + .setCoder(splitCoder) + .apply( + "Assign unique key", + WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>())) + .apply( + "Group by key", + new GBKIntoKeyedWorkItems<String, ElementAndRestriction<InputT, RestrictionT>>()); + checkArgument( + keyedWorkItems.getWindowingStrategy().getWindowFn() instanceof GlobalWindows, + "GBKIntoKeyedWorkItems must produce a globally windowed collection, " + + "but windowing strategy was: %s", + keyedWorkItems.getWindowingStrategy()); + return keyedWorkItems + .apply( + "Process", + ParDo.of( + new ProcessFn<InputT, OutputT, RestrictionT, TrackerT>( + fn, + input.getCoder(), + restrictionCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()))) + .setIsBoundedInternal(input.isBounded().and(isFnBounded)) + .setWindowingStrategyInternal(input.getWindowingStrategy()); + } + + /** + * Assigns a random unique key to each element of the input collection, so that the output + * collection is effectively the same elements as input, but the per-key state and timers are now + * effectively per-element. + */ + private static class RandomUniqueKeyFn<T> implements SerializableFunction<T, String> { + @Override + public String apply(T input) { + return UUID.randomUUID().toString(); + } + } + + /** + * Pairs each input element with its initial restriction using the given splittable {@link DoFn}. + */ + private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT> + extends DoFn<InputT, ElementAndRestriction<InputT, RestrictionT>> { + private DoFn<InputT, OutputT> fn; + private transient DoFnInvoker<InputT, OutputT> invoker; + + PairWithRestrictionFn(DoFn<InputT, OutputT> fn) { + this.fn = fn; + } + + @Setup + public void setup() { + invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + } + + @ProcessElement + public void processElement(ProcessContext context) { + context.output( + ElementAndRestriction.of( + context.element(), + invoker.<RestrictionT>invokeGetInitialRestriction(context.element()))); + } + } + + /** + * The heart of splittable {@link DoFn} execution: processes a single (element, restriction) pair + * by creating a tracker for the restriction and checkpointing/resuming processing later if + * necessary. + * + * <p>TODO: This uses deprecated OldDoFn since DoFn does not provide access to state/timer + * internals. This should be rewritten to use the <a href="https://s.apache.org/beam-state">State + * and Timers API</a> once it is available. + */ + @VisibleForTesting + static class ProcessFn< + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> + extends OldDoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> { + // Commit at least once every 10k output records. This keeps the watermark advancing + // smoothly, and ensures that not too much work will have to be reprocessed in the event of + // a crash. + // TODO: Also commit at least once every N seconds (runner-specific parameter). + @VisibleForTesting static final int MAX_OUTPUTS_PER_BUNDLE = 10000; + + /** + * The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is + * acquired during the first {@link DoFn.ProcessElement} call for each element and restriction, + * and is released when the {@link DoFn.ProcessElement} call returns {@link + * DoFn.ProcessContinuation#stop}. + * + * <p>A hold is needed to avoid letting the output watermark immediately progress together with + * the input watermark when the first {@link DoFn.ProcessElement} call for this element + * completes. + * + * <p>The hold is updated with the future output watermark reported by ProcessContinuation. + */ + private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag = + StateTags.makeSystemTagInternal( + StateTags.<GlobalWindow>watermarkStateInternal( + "hold", OutputTimeFns.outputAtLatestInputTimestamp())); + + /** + * The state cell containing a copy of the element. Written during the first {@link + * DoFn.ProcessElement} call and read during subsequent calls in response to timer firings, when + * the original element is no longer available. + */ + private final StateTag<Object, ValueState<WindowedValue<InputT>>> elementTag; + + /** + * The state cell containing a restriction representing the unprocessed part of work for this + * element. + */ + private StateTag<Object, ValueState<RestrictionT>> restrictionTag; + + private final DoFn<InputT, OutputT> fn; + private final Coder<? extends BoundedWindow> windowCoder; + + private transient DoFnInvoker<InputT, OutputT> invoker; + + ProcessFn( + DoFn<InputT, OutputT> fn, + Coder<InputT> elementCoder, + Coder<RestrictionT> restrictionCoder, + Coder<? extends BoundedWindow> windowCoder) { + this.fn = fn; + this.windowCoder = windowCoder; + elementTag = + StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder)); + DoFnInvoker<InputT, OutputT> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + restrictionTag = StateTags.value("restriction", restrictionCoder); + } + + @Override + public void setup() throws Exception { + invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + } + + @Override + public void processElement(final ProcessContext c) { + // Initialize state (element and restriction) depending on whether this is the seed call. + // The seed call is the first call for this element, which actually has the element. + // Subsequent calls are timer firings and the element has to be retrieved from the state. + TimerInternals.TimerData timer = Iterables.getOnlyElement(c.element().timersIterable(), null); + boolean isSeedCall = (timer == null); + StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : timer.getNamespace(); + ValueState<WindowedValue<InputT>> elementState = + c.windowingInternals().stateInternals().state(stateNamespace, elementTag); + ValueState<RestrictionT> restrictionState = + c.windowingInternals().stateInternals().state(stateNamespace, restrictionTag); + WatermarkHoldState<GlobalWindow> holdState = + c.windowingInternals().stateInternals().state(stateNamespace, watermarkHoldTag); + + ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction; + if (isSeedCall) { + // The element and restriction are available in c.element(). + WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue = + Iterables.getOnlyElement(c.element().elementsIterable()); + WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element()); + elementState.write(element); + elementAndRestriction = + ElementAndRestriction.of(element, windowedValue.getValue().restriction()); + } else { + // This is not the first ProcessElement call for this element/restriction - rather, + // this is a timer firing, so we need to fetch the element and restriction from state. + elementState.readLater(); + restrictionState.readLater(); + elementAndRestriction = + ElementAndRestriction.of(elementState.read(), restrictionState.read()); + } + + final TrackerT tracker = invoker.invokeNewTracker(elementAndRestriction.restriction()); + @SuppressWarnings("unchecked") + final RestrictionT[] residual = (RestrictionT[]) new Object[1]; + // TODO: Only let the call run for a limited amount of time, rather than simply + // producing a limited amount of output. + DoFn.ProcessContinuation cont = + invoker.invokeProcessElement( + makeContext(c, elementAndRestriction.element(), tracker, residual), + wrapTracker(tracker)); + if (residual[0] == null) { + // This means the call completed unsolicited, and the context produced by makeContext() + // did not take a checkpoint. Take one now. + residual[0] = checkNotNull(tracker.checkpoint()); + } + + // Save state for resuming. + if (!cont.shouldResume()) { + // All work for this element/restriction is completed. Clear state and release hold. + elementState.clear(); + restrictionState.clear(); + holdState.clear(); + return; + } + restrictionState.write(residual[0]); + Instant futureOutputWatermark = cont.getWatermark(); + if (futureOutputWatermark != null) { + holdState.add(futureOutputWatermark); + } + // Set a timer to continue processing this element. + TimerInternals timerInternals = c.windowingInternals().timerInternals(); + timerInternals.setTimer( + TimerInternals.TimerData.of( + stateNamespace, + timerInternals.currentProcessingTime().plus(cont.resumeDelay()), + TimeDomain.PROCESSING_TIME)); + } + + private DoFn<InputT, OutputT>.ProcessContext makeContext( + final ProcessContext baseContext, + final WindowedValue<InputT> element, + final TrackerT tracker, + final RestrictionT[] residualRestrictionHolder) { + return fn.new ProcessContext() { + private int numOutputs = 0; + + public InputT element() { + return element.getValue(); + } + + public Instant timestamp() { + return element.getTimestamp(); + } + + public PaneInfo pane() { + return element.getPane(); + } + + public void output(OutputT output) { + baseContext + .windowingInternals() + .outputWindowedValue( + output, element.getTimestamp(), element.getWindows(), element.getPane()); + noteOutput(); + } + + public void outputWithTimestamp(OutputT output, Instant timestamp) { + baseContext + .windowingInternals() + .outputWindowedValue(output, timestamp, element.getWindows(), element.getPane()); + noteOutput(); + } + + private void noteOutput() { + if (++numOutputs >= MAX_OUTPUTS_PER_BUNDLE) { + // Request a checkpoint. The fn *may* produce more output, but hopefully not too much. + residualRestrictionHolder[0] = tracker.checkpoint(); + } + } + + public <T> T sideInput(PCollectionView<T> view) { + return baseContext.sideInput(view); + } + + public PipelineOptions getPipelineOptions() { + return baseContext.getPipelineOptions(); + } + + public <T> void sideOutput(TupleTag<T> tag, T output) { + // TODO: I'm not sure how to implement this correctly: there's no + // "internals.sideOutputWindowedValue". + throw new UnsupportedOperationException( + "Side outputs not yet supported by splittable DoFn"); + } + + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + // TODO: I'm not sure how to implement this correctly: there's no + // "internals.sideOutputWindowedValue". + throw new UnsupportedOperationException( + "Side outputs not yet supported by splittable DoFn"); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + return fn.createAggregator(name, combiner); + } + }; + } + + /** Creates an {@link DoFn.ExtraContextFactory} that provides just the given tracker. */ + private DoFn.ExtraContextFactory<InputT, OutputT> wrapTracker(final TrackerT tracker) { + return new ExtraContextFactoryForTracker<>(tracker); + } + + private static class ExtraContextFactoryForTracker< + InputT, OutputT, TrackerT extends RestrictionTracker<?>> + implements DoFn.ExtraContextFactory<InputT, OutputT> { + private final TrackerT tracker; + + ExtraContextFactoryForTracker(TrackerT tracker) { + this.tracker = tracker; + } + + @Override + public BoundedWindow window() { + // DoFnSignatures should have verified that this DoFn doesn't access extra context. + throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); + } + + @Override + public DoFn.InputProvider<InputT> inputProvider() { + // DoFnSignatures should have verified that this DoFn doesn't access extra context. + throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); + } + + @Override + public DoFn.OutputReceiver<OutputT> outputReceiver() { + // DoFnSignatures should have verified that this DoFn doesn't access extra context. + throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); + } + + @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + // DoFnSignatures should have verified that this DoFn doesn't access extra context. + throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); + } + + @Override + public TrackerT restrictionTracker() { + return tracker; + } + } + } + + /** Splits the restriction using the given {@link DoFn.SplitRestriction} method. */ + private static class SplitRestrictionFn<InputT, RestrictionT> + extends DoFn< + ElementAndRestriction<InputT, RestrictionT>, + ElementAndRestriction<InputT, RestrictionT>> { + private final DoFn<InputT, ?> splittableFn; + private transient DoFnInvoker<InputT, ?> invoker; + + SplitRestrictionFn(DoFn<InputT, ?> splittableFn) { + this.splittableFn = splittableFn; + } + + @Setup + public void setup() { + invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(splittableFn); + } + + @ProcessElement + public void processElement(final ProcessContext c) { + final InputT element = c.element().element(); + invoker.invokeSplitRestriction( + element, + c.element().restriction(), + new OutputReceiver<RestrictionT>() { + @Override + public void output(RestrictionT part) { + c.output(ElementAndRestriction.of(element, part)); + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java new file mode 100644 index 0000000..f516046 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.CoderUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +/** + * Tests for {@link ElementAndRestrictionCoder}. Parroted from {@link + * org.apache.beam.sdk.coders.KvCoderTest}. + */ +@RunWith(Parameterized.class) +public class ElementAndRestrictionCoderTest<K, V> { + private static class CoderAndData<T> { + Coder<T> coder; + List<T> data; + } + + private static class AnyCoderAndData { + private CoderAndData<?> coderAndData; + } + + private static <T> AnyCoderAndData coderAndData(Coder<T> coder, List<T> data) { + CoderAndData<T> coderAndData = new CoderAndData<>(); + coderAndData.coder = coder; + coderAndData.data = data; + AnyCoderAndData res = new AnyCoderAndData(); + res.coderAndData = coderAndData; + return res; + } + + private static final List<AnyCoderAndData> TEST_DATA = + Arrays.asList( + coderAndData( + VarIntCoder.of(), Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, Integer.MIN_VALUE)), + coderAndData( + BigEndianLongCoder.of(), + Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE)), + coderAndData(StringUtf8Coder.of(), Arrays.asList("", "hello", "goodbye", "1")), + coderAndData( + ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), + Arrays.asList( + ElementAndRestriction.of("", -1), + ElementAndRestriction.of("hello", 0), + ElementAndRestriction.of("goodbye", Integer.MAX_VALUE))), + coderAndData( + ListCoder.of(VarLongCoder.of()), + Arrays.asList(Arrays.asList(1L, 2L, 3L), Collections.<Long>emptyList()))); + + @Parameterized.Parameters(name = "{index}: keyCoder={0} key={1} valueCoder={2} value={3}") + public static Collection<Object[]> data() { + List<Object[]> parameters = new ArrayList<>(); + for (AnyCoderAndData keyCoderAndData : TEST_DATA) { + Coder keyCoder = keyCoderAndData.coderAndData.coder; + for (Object key : keyCoderAndData.coderAndData.data) { + for (AnyCoderAndData valueCoderAndData : TEST_DATA) { + Coder valueCoder = valueCoderAndData.coderAndData.coder; + for (Object value : valueCoderAndData.coderAndData.data) { + parameters.add(new Object[] {keyCoder, key, valueCoder, value}); + } + } + } + } + return parameters; + } + + @Parameter(0) + public Coder<K> keyCoder; + @Parameter(1) + public K key; + @Parameter(2) + public Coder<V> valueCoder; + @Parameter(3) + public V value; + + @Test + @SuppressWarnings("rawtypes") + public void testDecodeEncodeEqual() throws Exception { + CoderProperties.coderDecodeEncodeEqual( + ElementAndRestrictionCoder.of(keyCoder, valueCoder), + ElementAndRestriction.of(key, value)); + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void encodeNullThrowsCoderException() throws Exception { + thrown.expect(CoderException.class); + thrown.expectMessage("cannot encode a null ElementAndRestriction"); + + CoderUtils.encodeToBase64( + ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java new file mode 100644 index 0000000..a76c4da --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; +import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItems; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link SplittableParDo}. */ +@RunWith(JUnit4.class) +public class SplittableParDoTest { + // ----------------- Tests for whether the transform sets boundedness correctly -------------- + private static class SomeRestriction implements Serializable {} + + private static class SomeRestrictionTracker implements RestrictionTracker<SomeRestriction> { + private final SomeRestriction someRestriction = new SomeRestriction(); + + @Override + public SomeRestriction currentRestriction() { + return someRestriction; + } + + @Override + public SomeRestriction checkpoint() { + return someRestriction; + } + } + + private static class BoundedFakeFn extends DoFn<Integer, String> { + @ProcessElement + public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {} + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + } + + private static class UnboundedFakeFn extends DoFn<Integer, String> { + @ProcessElement + public ProcessContinuation processElement( + ProcessContext context, SomeRestrictionTracker tracker) { + return stop(); + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + } + + private static PCollection<Integer> makeUnboundedCollection(Pipeline pipeline) { + return pipeline + .apply("unbounded", Create.of(1, 2, 3)) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + } + + private static PCollection<Integer> makeBoundedCollection(Pipeline pipeline) { + return pipeline + .apply("bounded", Create.of(1, 2, 3)) + .setIsBoundedInternal(PCollection.IsBounded.BOUNDED); + } + + @Test + @Category(RunnableOnService.class) + public void testBoundednessForBoundedFn() { + Pipeline pipeline = TestPipeline.create(); + DoFn<Integer, String> boundedFn = new BoundedFakeFn(); + assertEquals( + PCollection.IsBounded.BOUNDED, + makeBoundedCollection(pipeline) + .apply("bounded to bounded", new SplittableParDo<>(boundedFn)) + .isBounded()); + assertEquals( + PCollection.IsBounded.BOUNDED, + makeUnboundedCollection(pipeline) + .apply("bounded to unbounded", new SplittableParDo<>(boundedFn)) + .isBounded()); + } + + @Test + @Category(RunnableOnService.class) + public void testBoundednessForUnboundedFn() { + Pipeline pipeline = TestPipeline.create(); + DoFn<Integer, String> unboundedFn = new UnboundedFakeFn(); + assertEquals( + PCollection.IsBounded.BOUNDED, + makeBoundedCollection(pipeline) + .apply("unbounded to bounded", new SplittableParDo<>(unboundedFn)) + .isBounded()); + assertEquals( + PCollection.IsBounded.BOUNDED, + makeUnboundedCollection(pipeline) + .apply("unbounded to unbounded", new SplittableParDo<>(unboundedFn)) + .isBounded()); + } + + // ------------------------------- Tests for ProcessFn --------------------------------- + + /** + * A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but possibly over multiple + * {@link DoFn.ProcessElement} calls). + */ + private static class ProcessFnTester< + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> { + private final DoFnTester< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + tester; + private Instant currentProcessingTime; + + ProcessFnTester( + Instant currentProcessingTime, + DoFn<InputT, OutputT> fn, + Coder<InputT> inputCoder, + Coder<RestrictionT> restrictionCoder) + throws Exception { + SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn = + new SplittableParDo.ProcessFn<>( + fn, inputCoder, restrictionCoder, IntervalWindow.getCoder()); + this.tester = DoFnTester.of(processFn); + this.tester.startBundle(); + this.tester.advanceProcessingTime(currentProcessingTime); + + this.currentProcessingTime = currentProcessingTime; + } + + /** Performs a seed {@link DoFn.ProcessElement} call feeding the element and restriction. */ + void startElement(InputT element, RestrictionT restriction) throws Exception { + startElement( + WindowedValue.of( + ElementAndRestriction.of(element, restriction), + currentProcessingTime, + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + + void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue) + throws Exception { + tester.processElement(KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue))); + } + + /** + * Advances processing time by a given duration and, if any timers fired, performs a non-seed + * {@link DoFn.ProcessElement} call, feeding it the timers. + */ + boolean advanceProcessingTimeBy(Duration duration) throws Exception { + currentProcessingTime = currentProcessingTime.plus(duration); + List<TimerInternals.TimerData> timers = tester.advanceProcessingTime(currentProcessingTime); + if (timers.isEmpty()) { + return false; + } + tester.processElement( + KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem( + "key", timers)); + return true; + } + + List<TimestampedValue<OutputT>> peekOutputElementsInWindow(BoundedWindow window) { + return tester.peekOutputElementsInWindow(window); + } + + List<OutputT> takeOutputElements() { + return tester.takeOutputElements(); + } + } + + /** A simple splittable {@link DoFn} that's actually monolithic. */ + private static class ToStringFn extends DoFn<Integer, String> { + @ProcessElement + public void process(ProcessContext c, SomeRestrictionTracker tracker) { + c.output(c.element().toString() + "a"); + c.output(c.element().toString() + "b"); + c.output(c.element().toString() + "c"); + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer elem) { + return new SomeRestriction(); + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return new SomeRestrictionTracker(); + } + } + + @Test + public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() throws Exception { + // Tests that ProcessFn correctly propagates windows and timestamp of the element + // inside the KeyedWorkItem. + // The underlying DoFn is actually monolithic, so this doesn't test splitting. + DoFn<Integer, String> fn = new ToStringFn(); + + Instant base = Instant.now(); + ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester = + new ProcessFnTester<>( + base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class)); + + IntervalWindow w1 = + new IntervalWindow( + base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1))); + IntervalWindow w2 = + new IntervalWindow( + base.minus(Duration.standardMinutes(2)), base.plus(Duration.standardMinutes(2))); + IntervalWindow w3 = + new IntervalWindow( + base.minus(Duration.standardMinutes(3)), base.plus(Duration.standardMinutes(3))); + + tester.startElement( + WindowedValue.of( + ElementAndRestriction.of(42, new SomeRestriction()), + base, + Arrays.asList(w1, w2, w3), + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) { + assertEquals( + Arrays.asList( + TimestampedValue.of("42a", base), + TimestampedValue.of("42b", base), + TimestampedValue.of("42c", base)), + tester.peekOutputElementsInWindow(w)); + } + } + + /** A simple splittable {@link DoFn} that outputs the given element every 5 seconds forever. */ + private static class SelfInitiatedResumeFn extends DoFn<Integer, String> { + @ProcessElement + public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) { + c.output(c.element().toString()); + return resume().withResumeDelay(Duration.standardSeconds(5)).withWatermark(c.timestamp()); + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer elem) { + return new SomeRestriction(); + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return new SomeRestrictionTracker(); + } + } + + @Test + public void testResumeSetsTimer() throws Exception { + DoFn<Integer, String> fn = new SelfInitiatedResumeFn(); + Instant base = Instant.now(); + ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester = + new ProcessFnTester<>( + base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class)); + + tester.startElement(42, new SomeRestriction()); + assertThat(tester.takeOutputElements(), contains("42")); + + // Should resume after 5 seconds: advancing by 3 seconds should have no effect. + assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); + assertTrue(tester.takeOutputElements().isEmpty()); + + // 6 seconds should be enough - should invoke the fn again. + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); + assertThat(tester.takeOutputElements(), contains("42")); + + // Should again resume after 5 seconds: advancing by 3 seconds should again have no effect. + assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); + assertTrue(tester.takeOutputElements().isEmpty()); + + // 6 seconds should again be enough. + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); + assertThat(tester.takeOutputElements(), contains("42")); + } + + private static class SomeCheckpoint implements Serializable { + private int firstUnprocessedIndex; + + private SomeCheckpoint(int firstUnprocessedIndex) { + this.firstUnprocessedIndex = firstUnprocessedIndex; + } + } + + private static class SomeCheckpointTracker implements RestrictionTracker<SomeCheckpoint> { + private SomeCheckpoint current; + private boolean isActive = true; + + private SomeCheckpointTracker(SomeCheckpoint current) { + this.current = current; + } + + @Override + public SomeCheckpoint currentRestriction() { + return current; + } + + public boolean tryUpdateCheckpoint(int firstUnprocessedIndex) { + if (!isActive) { + return false; + } + current = new SomeCheckpoint(firstUnprocessedIndex); + return true; + } + + @Override + public SomeCheckpoint checkpoint() { + isActive = false; + return current; + } + } + + /** + * A splittable {@link DoFn} that generates the sequence [init, init + total) in batches of given + * size. + */ + private static class CounterFn extends DoFn<Integer, String> { + private final int numTotalOutputs; + private final int numOutputsPerCall; + + private CounterFn(int numTotalOutputs, int numOutputsPerCall) { + this.numTotalOutputs = numTotalOutputs; + this.numOutputsPerCall = numOutputsPerCall; + } + + @ProcessElement + public ProcessContinuation process(ProcessContext c, SomeCheckpointTracker tracker) { + int start = tracker.currentRestriction().firstUnprocessedIndex; + for (int i = 0; i < numOutputsPerCall; ++i) { + int index = start + i; + if (!tracker.tryUpdateCheckpoint(index + 1)) { + return resume(); + } + if (index >= numTotalOutputs) { + return stop(); + } + c.output(String.valueOf(c.element() + index)); + } + return resume(); + } + + @GetInitialRestriction + public SomeCheckpoint getInitialRestriction(Integer elem) { + throw new UnsupportedOperationException("Expected to be supplied explicitly in this test"); + } + + @NewTracker + public SomeCheckpointTracker newTracker(SomeCheckpoint restriction) { + return new SomeCheckpointTracker(restriction); + } + } + + @Test + public void testResumeCarriesOverState() throws Exception { + DoFn<Integer, String> fn = new CounterFn(3, 1); + Instant base = Instant.now(); + ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester = + new ProcessFnTester<>( + base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class)); + + tester.startElement(42, new SomeCheckpoint(0)); + assertThat(tester.takeOutputElements(), contains("42")); + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + assertThat(tester.takeOutputElements(), contains("43")); + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + assertThat(tester.takeOutputElements(), contains("44")); + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + // After outputting all 3 items, should not output anything more. + assertEquals(0, tester.takeOutputElements().size()); + // Should also not ask to resume. + assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + } + + @Test + public void testReactsToCheckpoint() throws Exception { + int max = SplittableParDo.ProcessFn.MAX_OUTPUTS_PER_BUNDLE; + // Create an fn that attempts to 2x output more than checkpointing allows. + DoFn<Integer, String> fn = new CounterFn(2 * max + max / 2, 2 * max); + Instant base = Instant.now(); + int baseIndex = 42; + + ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester = + new ProcessFnTester<>( + base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class)); + + List<String> elements; + + tester.startElement(baseIndex, new SomeCheckpoint(0)); + elements = tester.takeOutputElements(); + assertEquals(max, elements.size()); + // Should output the range [0, max) + assertThat(elements, hasItem(String.valueOf(baseIndex))); + assertThat(elements, hasItem(String.valueOf(baseIndex + max - 1))); + + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + elements = tester.takeOutputElements(); + assertEquals(max, elements.size()); + // Should output the range [max, 2*max) + assertThat(elements, hasItem(String.valueOf(baseIndex + max))); + assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max - 1))); + + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + elements = tester.takeOutputElements(); + assertEquals(max / 2, elements.size()); + // Should output the range [2*max, 2*max + max/2) + assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max))); + assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max + max / 2 - 1))); + assertThat(elements, not(hasItem((String.valueOf(baseIndex + 2 * max + max / 2))))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java new file mode 100644 index 0000000..e232552 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.runners.core.GBKIntoKeyedWorkItems; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItemCoder; +import org.apache.beam.sdk.util.ReifyTimestampsAndWindows; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +/** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */ +class DirectGBKIntoKeyedWorkItemsOverrideFactory implements PTransformOverrideFactory { + @Override + @SuppressWarnings("unchecked") + public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override( + PTransform<InputT, OutputT> transform) { + return new DirectGBKIntoKeyedWorkItems(transform.getName()); + } + + /** The Direct Runner specific implementation of {@link GBKIntoKeyedWorkItems}. */ + private static class DirectGBKIntoKeyedWorkItems<KeyT, InputT> + extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> { + DirectGBKIntoKeyedWorkItems(String name) { + super(name); + } + + @Override + public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) { + checkArgument(input.getCoder() instanceof KvCoder); + KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder(); + return input + .apply(new ReifyTimestampsAndWindows<KeyT, InputT>()) + // TODO: Perhaps windowing strategy should instead be set by ReifyTAW, or by DGBKO + .setWindowingStrategyInternal(WindowingStrategy.globalDefault()) + .apply(new DirectGroupByKey.DirectGroupByKeyOnly<KeyT, InputT>()) + .setCoder( + KeyedWorkItemCoder.of( + kvCoder.getKeyCoder(), + kvCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 224101a..a72f7ae 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.annotation.Nullable; +import org.apache.beam.runners.core.GBKIntoKeyedWorkItems; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory; @@ -83,6 +84,10 @@ public class DirectRunner .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory()) .put(TestStream.class, new DirectTestStreamFactory()) .put(Write.Bound.class, new WriteWithShardingFactory()) + .put(ParDo.Bound.class, new ParDoOverrideFactory()) + .put( + GBKIntoKeyedWorkItems.class, + new DirectGBKIntoKeyedWorkItemsOverrideFactory()) .build(); /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java new file mode 100644 index 0000000..a57735c --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +/** + * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo} + * in the direct runner. Currently overrides applications of <a + * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>. + */ +class ParDoOverrideFactory implements PTransformOverrideFactory { + @Override + @SuppressWarnings("unchecked") + public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override( + PTransform<InputT, OutputT> transform) { + if (!(transform instanceof ParDo.Bound)) { + return transform; + } + ParDo.Bound<InputT, OutputT> that = (ParDo.Bound<InputT, OutputT>) transform; + DoFn<InputT, OutputT> fn = DoFnAdapters.getDoFn(that.getFn()); + if (fn == null) { + // This is an OldDoFn, hence not splittable. + return transform; + } + DoFnSignature signature = DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass()); + if (!signature.processElement().isSplittable()) { + return transform; + } + return new SplittableParDo(fn); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java new file mode 100644 index 0000000..84a0cd9 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.MutableDateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} behavior + * using the direct runner. + * + * <p>TODO: make this use @RunnableOnService. + */ +@RunWith(JUnit4.class) +public class SplittableDoFnTest { + static class OffsetRange implements Serializable { + public final int from; + public final int to; + + OffsetRange(int from, int to) { + this.from = from; + this.to = to; + } + } + + private static class OffsetRangeTracker implements RestrictionTracker<OffsetRange> { + private OffsetRange range; + private Integer lastClaimedIndex = null; + + OffsetRangeTracker(OffsetRange range) { + this.range = checkNotNull(range); + } + + @Override + public OffsetRange currentRestriction() { + return range; + } + + @Override + public OffsetRange checkpoint() { + if (lastClaimedIndex == null) { + OffsetRange res = range; + range = new OffsetRange(range.from, range.from); + return res; + } + OffsetRange res = new OffsetRange(lastClaimedIndex + 1, range.to); + this.range = new OffsetRange(range.from, lastClaimedIndex + 1); + return res; + } + + boolean tryClaim(int i) { + checkState(lastClaimedIndex == null || i > lastClaimedIndex); + if (i >= range.to) { + return false; + } + lastClaimedIndex = i; + return true; + } + } + + static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> { + @ProcessElement + public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) { + for (int i = tracker.currentRestriction().from; tracker.tryClaim(i); ++i) { + c.output(KV.of(c.element(), i)); + if (i % 3 == 0) { + return ProcessContinuation.resume(); + } + } + return ProcessContinuation.stop(); + } + + @GetInitialRestriction + public OffsetRange getInitialRange(String element) { + return new OffsetRange(0, element.length()); + } + + @SplitRestriction + public void splitRange( + String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) { + receiver.output(new OffsetRange(range.from, (range.from + range.to) / 2)); + receiver.output(new OffsetRange((range.from + range.to) / 2, range.to)); + } + + @NewTracker + public OffsetRangeTracker newTracker(OffsetRange range) { + return new OffsetRangeTracker(range); + } + } + + private static class ReifyTimestampsFn<T> extends DoFn<T, TimestampedValue<T>> { + @ProcessElement + public void process(ProcessContext c) { + c.output(TimestampedValue.of(c.element(), c.timestamp())); + } + } + + @Test + public void testPairWithIndexBasic() throws ClassNotFoundException { + Pipeline p = TestPipeline.create(); + p.getOptions().setRunner(DirectRunner.class); + PCollection<KV<String, Integer>> res = + p.apply(Create.of("a", "bb", "ccccc")) + .apply(ParDo.of(new PairStringWithIndexToLength())) + .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())); + + PAssert.that(res) + .containsInAnyOrder( + Arrays.asList( + KV.of("a", 0), + KV.of("bb", 0), + KV.of("bb", 1), + KV.of("ccccc", 0), + KV.of("ccccc", 1), + KV.of("ccccc", 2), + KV.of("ccccc", 3), + KV.of("ccccc", 4))); + + p.run(); + } + + @Test + public void testPairWithIndexWindowedTimestamped() throws ClassNotFoundException { + // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps + // of elements in the input collection. + Pipeline p = TestPipeline.create(); + p.getOptions().setRunner(DirectRunner.class); + + MutableDateTime mutableNow = Instant.now().toMutableDateTime(); + mutableNow.setMillisOfSecond(0); + Instant now = mutableNow.toInstant(); + Instant nowP1 = now.plus(Duration.standardSeconds(1)); + Instant nowP2 = now.plus(Duration.standardSeconds(2)); + + SlidingWindows windowFn = + SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)); + PCollection<KV<String, Integer>> res = + p.apply( + Create.timestamped( + TimestampedValue.of("a", now), + TimestampedValue.of("bb", nowP1), + TimestampedValue.of("ccccc", nowP2))) + .apply(Window.<String>into(windowFn)) + .apply(ParDo.of(new PairStringWithIndexToLength())) + .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())); + + assertEquals(windowFn, res.getWindowingStrategy().getWindowFn()); + + PCollection<TimestampedValue<KV<String, Integer>>> timestamped = + res.apply("Reify timestamps", ParDo.of(new ReifyTimestampsFn<KV<String, Integer>>())); + + for (int i = 0; i < 4; ++i) { + Instant base = now.minus(Duration.standardSeconds(i)); + IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5))); + + List<TimestampedValue<KV<String, Integer>>> expectedUnfiltered = + Arrays.asList( + TimestampedValue.of(KV.of("a", 0), now), + TimestampedValue.of(KV.of("bb", 0), nowP1), + TimestampedValue.of(KV.of("bb", 1), nowP1), + TimestampedValue.of(KV.of("ccccc", 0), nowP2), + TimestampedValue.of(KV.of("ccccc", 1), nowP2), + TimestampedValue.of(KV.of("ccccc", 2), nowP2), + TimestampedValue.of(KV.of("ccccc", 3), nowP2), + TimestampedValue.of(KV.of("ccccc", 4), nowP2)); + + List<TimestampedValue<KV<String, Integer>>> expected = new ArrayList<>(); + for (TimestampedValue<KV<String, Integer>> tv : expectedUnfiltered) { + if (!window.start().isAfter(tv.getTimestamp()) + && !tv.getTimestamp().isAfter(window.maxTimestamp())) { + expected.add(tv); + } + } + assertFalse(expected.isEmpty()); + + PAssert.that(timestamped).inWindow(window).containsInAnyOrder(expected); + } + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index f806926..789f4b2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -76,6 +76,12 @@ public @interface Experimental { TIMERS, /** Experimental APIs related to customizing the output time for computed values. */ - OUTPUT_TIME + OUTPUT_TIME, + + /** + * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>. + * Do not use: API is unstable and runner support is incomplete. + */ + SPLITTABLE_DO_FN, } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index fb7fbd4..62da28c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.auto.value.AutoValue; import java.io.Serializable; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -29,6 +30,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.util.HashMap; import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.PipelineOptions; @@ -37,9 +40,11 @@ import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; @@ -341,14 +346,20 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD */ @Deprecated WindowingInternals<InputT, OutputT> windowingInternals(); + + /** + * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with + * the current {@link ProcessElement} call. + */ + <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker(); } - /** A placeholder for testing handling of output types during {@link DoFn} reflection. */ + /** Receives values of the given type. */ public interface OutputReceiver<T> { void output(T output); } - /** A placeholder for testing handling of input types during {@link DoFn} reflection. */ + /** Provides a single value of the given type. */ public interface InputProvider<T> { T get(); } @@ -375,6 +386,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD public WindowingInternals<InputT, OutputT> windowingInternals() { return null; } + + public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { + return null; + } } ///////////////////////////////////////////////////////////////////////////// @@ -412,14 +427,57 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD public @interface StartBundle {} /** - * Annotation for the method to use for processing elements. A subclass of - * {@link DoFn} must have a method with this annotation satisfying - * the following constraints in order for it to be executable: + * Annotation for the method to use for processing elements. A subclass of {@link DoFn} must have + * a method with this annotation. + * + * <p>The signature of this method must satisfy the following constraints: + * * <ul> - * <li>It must have at least one argument. - * <li>Its first argument must be a {@link DoFn.ProcessContext}. - * <li>Its remaining argument, if any, must be {@link BoundedWindow}. + * <li>Its first argument must be a {@link DoFn.ProcessContext}. + * <li>If one of its arguments is a subtype of {@link RestrictionTracker}, then it is a <a + * href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} subject to the + * separate requirements described below. Items below are assuming this is not a splittable + * {@link DoFn}. + * <li>If one of its arguments is {@link BoundedWindow}, this argument corresponds to the window + * of the current element. If absent, a runner may perform additional optimizations. + * <li>It must return {@code void}. * </ul> + * + * <h2>Splittable DoFn's (WARNING: work in progress, do not use)</h2> + * + * <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} method has a parameter + * whose type is a subtype of {@link RestrictionTracker}. This is an advanced feature and an + * overwhelming majority of users will never need to write a splittable {@link DoFn}. Right now + * the implementation of this feature is in progress and it's not ready for any use. + * + * <p>See <a href="https://s.apache.org/splittable-do-fn">the proposal</a> for an overview of the + * involved concepts (<i>splittable DoFn</i>, <i>restriction</i>, <i>restriction tracker</i>). + * + * <p>If a {@link DoFn} is splittable, the following constraints must be respected: + * + * <ul> + * <li>It <i>must</i> define a {@link GetInitialRestriction} method. + * <li>It <i>may</i> define a {@link SplitRestriction} method. + * <li>It <i>must</i> define a {@link NewTracker} method returning the same type as the type of + * the {@link RestrictionTracker} argument of {@link ProcessElement}, which in turn must be a + * subtype of {@code RestrictionTracker<R>} where {@code R} is the restriction type returned + * by {@link GetInitialRestriction}. + * <li>It <i>may</i> define a {@link GetRestrictionCoder} method. + * <li>The type of restrictions used by all of these methods must be the same. + * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to + * indicate whether there is more work to be done for the current element. + * <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such as + * {@link BoundedWindow}. + * <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or + * {@link UnboundedPerElement}, but not both at the same time. If it's not annotated with + * either of these, it's assumed to be {@link BoundedPerElement} if its {@link + * ProcessElement} method returns {@code void} and {@link UnboundedPerElement} if it + * returns a {@link ProcessContinuation}. + * </ul> + * + * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods. + * + * <p>More documentation will be added when the feature becomes ready for general usage. */ @Documented @Retention(RetentionPolicy.RUNTIME) @@ -455,6 +513,150 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD } /** + * Annotation for the method that maps an element to an initial restriction for a <a + * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}. + * + * <p>Signature: {@code RestrictionT getInitialRestriction(InputT element);} + * + * <p>TODO: Make the InputT parameter optional. + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @Experimental(Kind.SPLITTABLE_DO_FN) + public @interface GetInitialRestriction {} + + /** + * Annotation for the method that returns the coder to use for the restriction of a <a + * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}. + * + * <p>If not defined, a coder will be inferred using standard coder inference rules and the + * pipeline's {@link Pipeline#getCoderRegistry coder registry}. + * + * <p>This method will be called only at pipeline construction time. + * + * <p>Signature: {@code Coder<RestrictionT> getRestrictionCoder();} + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @Experimental(Kind.SPLITTABLE_DO_FN) + public @interface GetRestrictionCoder {} + + /** + * Annotation for the method that splits restriction of a <a + * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into multiple parts to + * be processed in parallel. + * + * <p>Signature: {@code List<RestrictionT> splitRestriction( InputT element, RestrictionT + * restriction);} + * + * <p>Optional: if this method is omitted, the restriction will not be split (equivalent to + * defining the method and returning {@code Collections.singletonList(restriction)}). + * + * <p>TODO: Introduce a parameter for controlling granularity of splitting, e.g. numParts. TODO: + * Make the InputT parameter optional. + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @Experimental(Kind.SPLITTABLE_DO_FN) + public @interface SplitRestriction {} + + /** + * Annotation for the method that creates a new {@link RestrictionTracker} for the restriction of + * a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}. + * + * <p>Signature: {@code MyRestrictionTracker newTracker(RestrictionT restriction);} where {@code + * MyRestrictionTracker} must be a subtype of {@code RestrictionTracker<RestrictionT>}. + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @Experimental(Kind.SPLITTABLE_DO_FN) + public @interface NewTracker {} + + /** + * Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} + * specifying that the {@link DoFn} performs a bounded amount of work per input element, so + * applying it to a bounded {@link PCollection} will produce also a bounded {@link PCollection}. + * It is an error to specify this on a non-splittable {@link DoFn}. + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.TYPE) + @Experimental(Kind.SPLITTABLE_DO_FN) + public @interface BoundedPerElement {} + + /** + * Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} + * specifying that the {@link DoFn} performs an unbounded amount of work per input element, so + * applying it to a bounded {@link PCollection} will produce an unbounded {@link PCollection}. It + * is an error to specify this on a non-splittable {@link DoFn}. + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.TYPE) + @Experimental(Kind.SPLITTABLE_DO_FN) + public @interface UnboundedPerElement {} + + // This can't be put into ProcessContinuation itself due to the following problem: + // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html + private static final ProcessContinuation PROCESS_CONTINUATION_STOP = + new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO, null); + + /** + * When used as a return value of {@link ProcessElement}, indicates whether there is more work to + * be done for the current element. + */ + @Experimental(Kind.SPLITTABLE_DO_FN) + @AutoValue + public abstract static class ProcessContinuation { + /** Indicates that there is no more work to be done for the current element. */ + public static ProcessContinuation stop() { + return PROCESS_CONTINUATION_STOP; + } + + /** Indicates that there is more work to be done for the current element. */ + public static ProcessContinuation resume() { + return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO, null); + } + + /** + * If false, the {@link DoFn} promises that there is no more work remaining for the current + * element, so the runner should not resume the {@link ProcessElement} call. + */ + public abstract boolean shouldResume(); + + /** + * A minimum duration that should elapse between the end of this {@link ProcessElement} call and + * the {@link ProcessElement} call continuing processing of the same element. By default, zero. + */ + public abstract Duration resumeDelay(); + + /** + * A lower bound provided by the {@link DoFn} on timestamps of the output that will be emitted + * by future {@link ProcessElement} calls continuing processing of the current element. + * + * <p>A runner should treat an absent value as equivalent to the timestamp of the input element. + */ + @Nullable + public abstract Instant getWatermark(); + + /** Builder method to set the value of {@link #resumeDelay()}. */ + public ProcessContinuation withResumeDelay(Duration resumeDelay) { + return new AutoValue_DoFn_ProcessContinuation( + shouldResume(), resumeDelay, getWatermark()); + } + + /** Builder method to set the value of {@link #getWatermark()}. */ + public ProcessContinuation withWatermark(Instant watermark) { + return new AutoValue_DoFn_ProcessContinuation( + shouldResume(), resumeDelay(), watermark); + } + } + + /** * Returns an {@link Aggregator} with aggregation logic specified by the * {@link CombineFn} argument. The name provided must be unique across * {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be created