Repository: incubator-beam Updated Branches: refs/heads/master fd4b631f1 -> 24fab9f53
Improve Splittable DoFn Makes Splittable DoFn be more like a real DoFn: - Adds support for side inputs and outputs to SDF - Teaches `ProcessFn` to work with exploded windows inside the `KeyedWorkItem`. It works with them by un-exploding the windows in the `Iterable<WindowedValue<ElementAndRestriction>>` into a single `WindowedValue`, since the values and timestamps are guaranteed to be the same. Makes SplittableParDo.ProcessFn not use the (now unavailable) OldDoFn state and timers API: - Makes `ProcessFn` be a primitive transform with its own `ParDoEvaluator`. As a nice side effect, this enables the runner to provide additional hooks into it - e.g. for giving the runner access to the restriction tracker (in later PRs) - For consistency, moves declaration of `GBKIntoKeyedWorkItems` primitive transform into `SplittableParDo`, alongside the `SplittableProcessElements` transform - Preserves compressed representation of `WindowedValue`'s in `PushbackSideInputDoFnRunner` - Uses OutputWindowedValue in SplittableParDo.ProcessFn Proper lifecycle management for wrapped fn. - Caches underlying fn using DoFnLifecycleManager, so its @Setup and @Teardown methods are called. - Calls @StartBundle and @FinishBundle methods on the underlying fn explicitly. Output from them is prohibited, since an SDF is only allowed to output after a successful RestrictionTracker.tryClaim. It's possible that an SDF should not be allowed to have StartBundle/FinishBundle methods at all, but I'm not sure. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87ff5ac3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87ff5ac3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87ff5ac3 Branch: refs/heads/master Commit: 87ff5ac36bb9cc62fa4864ffa7b5a5e495b9a4a1 Parents: fd4b631 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Wed Oct 26 16:05:01 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Thu Dec 1 14:15:55 2016 -0800 ---------------------------------------------------------------------- .../core/ElementAndRestrictionCoder.java | 8 + .../runners/core/GBKIntoKeyedWorkItems.java | 55 --- .../beam/runners/core/SplittableParDo.java | 378 +++++++++++++++---- .../beam/runners/core/SplittableParDoTest.java | 134 +++++-- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 41 +- .../beam/runners/direct/DirectGroupByKey.java | 2 +- .../beam/runners/direct/DirectRunner.java | 8 +- .../runners/direct/DoFnLifecycleManager.java | 4 +- .../beam/runners/direct/ParDoEvaluator.java | 26 +- .../runners/direct/ParDoEvaluatorFactory.java | 63 +++- .../direct/ParDoMultiOverrideFactory.java | 2 +- ...littableProcessElementsEvaluatorFactory.java | 144 +++++++ .../direct/TransformEvaluatorRegistry.java | 5 + .../beam/runners/direct/SplittableDoFnTest.java | 194 +++++++++- .../org/apache/beam/sdk/transforms/DoFn.java | 12 + .../apache/beam/sdk/transforms/DoFnTester.java | 51 ++- .../sdk/util/state/TimerInternalsFactory.java | 36 ++ 17 files changed, 905 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java index 6dec8e2..64c1e14 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java @@ -64,4 +64,12 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT> RestrictionT value = restrictionCoder.decode(inStream, context); return ElementAndRestriction.of(key, value); } + + public Coder<ElementT> getElementCoder() { + return elementCoder; + } + + public Coder<RestrictionT> getRestrictionCoder() { + return restrictionCoder; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java deleted file mode 100644 index 304e349..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static com.google.common.base.Preconditions.checkArgument; - -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItemCoder; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -/** - * Interface for creating a runner-specific {@link GroupByKey GroupByKey-like} {@link PTransform} - * that produces {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state - * and timers. - */ -@Experimental(Experimental.Kind.SPLITTABLE_DO_FN) -public class GBKIntoKeyedWorkItems<KeyT, InputT> - extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> { - @Override - public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) { - checkArgument(input.getCoder() instanceof KvCoder, - "Expected input coder to be KvCoder, but was %s", - input.getCoder().getClass().getSimpleName()); - - KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder(); - Coder<KeyedWorkItem<KeyT, InputT>> coder = KeyedWorkItemCoder.of( - kvCoder.getKeyCoder(), kvCoder.getValueCoder(), - input.getWindowingStrategy().getWindowFn().windowCoder()); - PCollection<KeyedWorkItem<KeyT, InputT>> collection = PCollection.createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); - collection.setCoder((Coder) coder); - return collection; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index c38ab2f..80fd17b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -19,17 +19,22 @@ package org.apache.beam.runners.core; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import java.util.List; import java.util.UUID; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -45,21 +50,30 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItemCoder; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.TimerInternalsFactory; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypedPValue; import org.joda.time.Instant; /** @@ -80,31 +94,53 @@ import org.joda.time.Instant; * ParDo.of(splittable DoFn)}, but not for direct use by pipeline writers. */ @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) -public class SplittableParDo< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> - extends PTransform<PCollection<InputT>, PCollection<OutputT>> { - private final DoFn<InputT, OutputT> fn; - private final DoFnSignature signature; +public class SplittableParDo<InputT, OutputT, RestrictionT> + extends PTransform<PCollection<InputT>, PCollectionTuple> { + private final ParDo.BoundMulti<InputT, OutputT> parDo; /** - * Creates the transform for the given original {@link ParDo} and {@link DoFn}. + * Creates the transform for the given original multi-output {@link ParDo}. * - * @param fn The splittable {@link DoFn} inside the original {@link ParDo} transform. + * @param parDo The splittable {@link ParDo} transform. */ - public SplittableParDo(DoFn<InputT, OutputT> fn) { - checkNotNull(fn, "fn must not be null"); - this.fn = fn; - this.signature = DoFnSignatures.getSignature(fn.getClass()); - checkArgument(signature.processElement().isSplittable(), "fn must be a splittable DoFn"); + public SplittableParDo(ParDo.BoundMulti<InputT, OutputT> parDo) { + checkNotNull(parDo, "parDo must not be null"); + this.parDo = parDo; + checkArgument( + DoFnSignatures.getSignature(parDo.getNewFn().getClass()).processElement().isSplittable(), + "fn must be a splittable DoFn"); } @Override - public PCollection<OutputT> apply(PCollection<InputT> input) { - PCollection.IsBounded isFnBounded = signature.isBoundedPerElement(); + public PCollectionTuple apply(PCollection<InputT> input) { + return applyTyped(input); + } + + private PCollectionTuple applyTyped(PCollection<InputT> input) { + DoFn<InputT, OutputT> fn = parDo.getNewFn(); Coder<RestrictionT> restrictionCoder = - DoFnInvokers - .invokerFor(fn) + DoFnInvokers.invokerFor(fn) .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry()); + PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> keyedWorkItems = + applySplitIntoKeyedWorkItems(input, fn, restrictionCoder); + return keyedWorkItems.apply( + "Process", + new ProcessElements<>( + fn, + input.getCoder(), + restrictionCoder, + input.getWindowingStrategy(), + parDo.getSideInputs(), + parDo.getMainOutputTag(), + parDo.getSideOutputTags())); + } + + private static <InputT, OutputT, RestrictionT> + PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> + applySplitIntoKeyedWorkItems( + PCollection<InputT> input, + DoFn<InputT, OutputT> fn, + Coder<RestrictionT> restrictionCoder) { Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder = ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder); @@ -121,23 +157,133 @@ public class SplittableParDo< WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>())) .apply( "Group by key", - new GBKIntoKeyedWorkItems<String, ElementAndRestriction<InputT, RestrictionT>>()); + new GBKIntoKeyedWorkItems<String, ElementAndRestriction<InputT, RestrictionT>>()) + .setCoder( + KeyedWorkItemCoder.of( + StringUtf8Coder.of(), + splitCoder, + input.getWindowingStrategy().getWindowFn().windowCoder())); checkArgument( keyedWorkItems.getWindowingStrategy().getWindowFn() instanceof GlobalWindows, "GBKIntoKeyedWorkItems must produce a globally windowed collection, " + "but windowing strategy was: %s", keyedWorkItems.getWindowingStrategy()); - return keyedWorkItems - .apply( - "Process", - ParDo.of( - new ProcessFn<InputT, OutputT, RestrictionT, TrackerT>( - fn, - input.getCoder(), - restrictionCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()))) - .setIsBoundedInternal(input.isBounded().and(isFnBounded)) - .setWindowingStrategyInternal(input.getWindowingStrategy()); + return keyedWorkItems; + } + + /** + * Runner-specific primitive {@link GroupByKey GroupByKey-like} {@link PTransform} that produces + * {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state and timers. + * + * <p>Unlike a real {@link GroupByKey}, ignores the input's windowing and triggering strategy and + * emits output immediately. + */ + public static class GBKIntoKeyedWorkItems<KeyT, InputT> + extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> { + @Override + public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded()); + } + } + + /** + * Runner-specific primitive {@link PTransform} that invokes the {@link DoFn.ProcessElement} + * method for a splittable {@link DoFn}. + */ + public static class ProcessElements<InputT, OutputT, RestrictionT> + extends PTransform< + PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>, + PCollectionTuple> { + private final DoFn<InputT, OutputT> fn; + private final Coder<InputT> elementCoder; + private final Coder<RestrictionT> restrictionCoder; + private final WindowingStrategy<?, ?> windowingStrategy; + private final List<PCollectionView<?>> sideInputs; + private final TupleTag<OutputT> mainOutputTag; + private final TupleTagList sideOutputTags; + + /** + * @param fn the splittable {@link DoFn}. + * @param windowingStrategy the {@link WindowingStrategy} of the input collection. + * @param sideInputs list of side inputs that should be available to the {@link DoFn}. + * @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output. + * @param sideOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} side outputs. + */ + public ProcessElements( + DoFn<InputT, OutputT> fn, + Coder<InputT> elementCoder, + Coder<RestrictionT> restrictionCoder, + WindowingStrategy<?, ?> windowingStrategy, + List<PCollectionView<?>> sideInputs, + TupleTag<OutputT> mainOutputTag, + TupleTagList sideOutputTags) { + this.fn = fn; + this.elementCoder = elementCoder; + this.restrictionCoder = restrictionCoder; + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + this.mainOutputTag = mainOutputTag; + this.sideOutputTags = sideOutputTags; + } + + public DoFn<InputT, OutputT> getFn() { + return fn; + } + + public List<PCollectionView<?>> getSideInputs() { + return sideInputs; + } + + public TupleTag<OutputT> getMainOutputTag() { + return mainOutputTag; + } + + public TupleTagList getSideOutputTags() { + return sideOutputTags; + } + + public ProcessFn<InputT, OutputT, RestrictionT, ?> newProcessFn(DoFn<InputT, OutputT> fn) { + return new SplittableParDo.ProcessFn<>( + fn, elementCoder, restrictionCoder, windowingStrategy.getWindowFn().windowCoder()); + } + + @Override + public PCollectionTuple apply( + PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> + input) { + DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); + PCollectionTuple outputs = + PCollectionTuple.ofPrimitiveOutputsInternal( + input.getPipeline(), + TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()), + windowingStrategy, + input.isBounded().and(signature.isBoundedPerElement())); + + // Set output type descriptor similarly to how ParDo.BoundMulti does it. + outputs.get(mainOutputTag).setTypeDescriptorInternal(fn.getOutputTypeDescriptor()); + + return outputs; + } + + @Override + public <T> Coder<T> getDefaultOutputCoder( + PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> + input, + TypedPValue<T> output) + throws CannotProvideCoderException { + // Similar logic to ParDo.BoundMulti.getDefaultOutputCoder. + @SuppressWarnings("unchecked") + KeyedWorkItemCoder<String, ElementAndRestriction<InputT, RestrictionT>> kwiCoder = + (KeyedWorkItemCoder) input.getCoder(); + Coder<InputT> inputCoder = + ((ElementAndRestrictionCoder<InputT, RestrictionT>) kwiCoder.getElementCoder()) + .getElementCoder(); + return input + .getPipeline() + .getCoderRegistry() + .getDefaultCoder(output.getTypeDescriptor(), fn.getInputTypeDescriptor(), inputCoder); + } } /** @@ -182,15 +328,11 @@ public class SplittableParDo< * The heart of splittable {@link DoFn} execution: processes a single (element, restriction) pair * by creating a tracker for the restriction and checkpointing/resuming processing later if * necessary. - * - * <p>TODO: This uses deprecated OldDoFn since DoFn does not provide access to state/timer - * internals. This should be rewritten to use the <a href="https://s.apache.org/beam-state">State - * and Timers API</a> once it is available. */ @VisibleForTesting - static class ProcessFn< + public static class ProcessFn< InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> - extends OldDoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> { + extends DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> { // Commit at least once every 10k output records. This keeps the watermark advancing // smoothly, and ensures that not too much work will have to be reprocessed in the event of // a crash. @@ -227,30 +369,56 @@ public class SplittableParDo< */ private StateTag<Object, ValueState<RestrictionT>> restrictionTag; + private transient StateInternalsFactory<String> stateInternalsFactory; + private transient TimerInternalsFactory<String> timerInternalsFactory; + private transient OutputWindowedValue<OutputT> outputWindowedValue; + private final DoFn<InputT, OutputT> fn; private final Coder<? extends BoundedWindow> windowCoder; private transient DoFnInvoker<InputT, OutputT> invoker; - ProcessFn( + public ProcessFn( DoFn<InputT, OutputT> fn, Coder<InputT> elementCoder, Coder<RestrictionT> restrictionCoder, Coder<? extends BoundedWindow> windowCoder) { this.fn = fn; + this.invoker = DoFnInvokers.invokerFor(fn); this.windowCoder = windowCoder; - elementTag = + this.elementTag = StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder)); - restrictionTag = StateTags.value("restriction", restrictionCoder); + this.restrictionTag = StateTags.value("restriction", restrictionCoder); } - @Override - public void setup() throws Exception { - invoker = DoFnInvokers.invokerFor(fn); + public void setStateInternalsFactory(StateInternalsFactory<String> stateInternalsFactory) { + this.stateInternalsFactory = stateInternalsFactory; } - @Override + public void setTimerInternalsFactory(TimerInternalsFactory<String> timerInternalsFactory) { + this.timerInternalsFactory = timerInternalsFactory; + } + + public void setOutputWindowedValue(OutputWindowedValue<OutputT> outputWindowedValue) { + this.outputWindowedValue = outputWindowedValue; + } + + @StartBundle + public void startBundle(Context c) throws Exception { + invoker.invokeStartBundle(wrapContext(c)); + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + invoker.invokeFinishBundle(wrapContext(c)); + } + + @ProcessElement public void processElement(final ProcessContext c) { + StateInternals<String> stateInternals = + stateInternalsFactory.stateInternalsForKey(c.element().key()); + TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(c.element().key()); + // Initialize state (element and restriction) depending on whether this is the seed call. // The seed call is the first call for this element, which actually has the element. // Subsequent calls are timer firings and the element has to be retrieved from the state. @@ -258,17 +426,23 @@ public class SplittableParDo< boolean isSeedCall = (timer == null); StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : timer.getNamespace(); ValueState<WindowedValue<InputT>> elementState = - c.windowingInternals().stateInternals().state(stateNamespace, elementTag); + stateInternals.state(stateNamespace, elementTag); ValueState<RestrictionT> restrictionState = - c.windowingInternals().stateInternals().state(stateNamespace, restrictionTag); + stateInternals.state(stateNamespace, restrictionTag); WatermarkHoldState<GlobalWindow> holdState = - c.windowingInternals().stateInternals().state(stateNamespace, watermarkHoldTag); + stateInternals.state(stateNamespace, watermarkHoldTag); ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction; if (isSeedCall) { // The element and restriction are available in c.element(). + // elementsIterable() will, by construction of SplittableParDo, contain the same value + // potentially in several different windows. We implode this into a single WindowedValue + // in order to simplify the rest of the code and avoid iterating over elementsIterable() + // explicitly. The windows of this WindowedValue will be propagated to windows of the + // output. This is correct because a splittable DoFn is not allowed to inspect the window + // of its element. WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue = - Iterables.getOnlyElement(c.element().elementsIterable()); + implodeWindows(c.element().elementsIterable()); WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element()); elementState.write(element); elementAndRestriction = @@ -290,7 +464,7 @@ public class SplittableParDo< DoFn.ProcessContinuation cont = invoker.invokeProcessElement( wrapTracker( - tracker, makeContext(c, elementAndRestriction.element(), tracker, residual))); + tracker, wrapContext(c, elementAndRestriction.element(), tracker, residual))); if (residual[0] == null) { // This means the call completed unsolicited, and the context produced by makeContext() // did not take a checkpoint. Take one now. @@ -307,19 +481,85 @@ public class SplittableParDo< } restrictionState.write(residual[0]); Instant futureOutputWatermark = cont.getWatermark(); - if (futureOutputWatermark != null) { - holdState.add(futureOutputWatermark); + if (futureOutputWatermark == null) { + futureOutputWatermark = elementAndRestriction.element().getTimestamp(); } + Instant wakeupTime = timerInternals.currentProcessingTime().plus(cont.resumeDelay()); + holdState.add(futureOutputWatermark); // Set a timer to continue processing this element. - TimerInternals timerInternals = c.windowingInternals().timerInternals(); timerInternals.setTimer( - TimerInternals.TimerData.of( - stateNamespace, - timerInternals.currentProcessingTime().plus(cont.resumeDelay()), - TimeDomain.PROCESSING_TIME)); + TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME)); + } + + /** + * Does the opposite of {@link WindowedValue#explodeWindows()} - creates a single {@link + * WindowedValue} from a collection of {@link WindowedValue}'s that is known to contain copies + * of the same value with the same timestamp, but different window sets. + * + * <p>This is only legal to do because we know that {@link RandomUniqueKeyFn} created unique + * keys for every {@link ElementAndRestriction}, so if there's multiple {@link WindowedValue}'s + * for the same key, that means only that the windows of that {@link ElementAndRestriction} are + * being delivered separately rather than all at once. It is also legal to do because splittable + * {@link DoFn} is not allowed to access the window of its element, so we can propagate the full + * set of windows of its input to its output. + */ + private static <InputT, RestrictionT> + WindowedValue<ElementAndRestriction<InputT, RestrictionT>> implodeWindows( + Iterable<WindowedValue<ElementAndRestriction<InputT, RestrictionT>>> values) { + WindowedValue<ElementAndRestriction<InputT, RestrictionT>> first = + Iterables.getFirst(values, null); + checkState(first != null, "Got a KeyedWorkItem with no elements and no timers"); + ImmutableList.Builder<BoundedWindow> windows = ImmutableList.builder(); + for (WindowedValue<ElementAndRestriction<InputT, RestrictionT>> value : values) { + windows.addAll(value.getWindows()); + } + return WindowedValue.of( + first.getValue(), first.getTimestamp(), windows.build(), first.getPane()); + } + + private DoFn<InputT, OutputT>.Context wrapContext(final Context baseContext) { + return fn.new Context() { + @Override + public PipelineOptions getPipelineOptions() { + return baseContext.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + throwUnsupportedOutput(); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + throwUnsupportedOutput(); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + throwUnsupportedOutput(); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + throwUnsupportedOutput(); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + return fn.createAggregator(name, combiner); + } + + private void throwUnsupportedOutput() { + throw new UnsupportedOperationException( + String.format( + "Splittable DoFn can only output from @%s", + ProcessElement.class.getSimpleName())); + } + }; } - private DoFn<InputT, OutputT>.ProcessContext makeContext( + private DoFn<InputT, OutputT>.ProcessContext wrapContext( final ProcessContext baseContext, final WindowedValue<InputT> element, final TrackerT tracker, @@ -340,17 +580,14 @@ public class SplittableParDo< } public void output(OutputT output) { - baseContext - .windowingInternals() - .outputWindowedValue( - output, element.getTimestamp(), element.getWindows(), element.getPane()); + outputWindowedValue.outputWindowedValue( + output, element.getTimestamp(), element.getWindows(), element.getPane()); noteOutput(); } public void outputWithTimestamp(OutputT output, Instant timestamp) { - baseContext - .windowingInternals() - .outputWindowedValue(output, timestamp, element.getWindows(), element.getPane()); + outputWindowedValue.outputWindowedValue( + output, timestamp, element.getWindows(), element.getPane()); noteOutput(); } @@ -370,17 +607,15 @@ public class SplittableParDo< } public <T> void sideOutput(TupleTag<T> tag, T output) { - // TODO: I'm not sure how to implement this correctly: there's no - // "internals.sideOutputWindowedValue". - throw new UnsupportedOperationException( - "Side outputs not yet supported by splittable DoFn"); + outputWindowedValue.sideOutputWindowedValue( + tag, output, element.getTimestamp(), element.getWindows(), element.getPane()); + noteOutput(); } public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - // TODO: I'm not sure how to implement this correctly: there's no - // "internals.sideOutputWindowedValue". - throw new UnsupportedOperationException( - "Side outputs not yet supported by splittable DoFn"); + outputWindowedValue.sideOutputWindowedValue( + tag, output, timestamp, element.getWindows(), element.getPane()); + noteOutput(); } @Override @@ -393,8 +628,7 @@ public class SplittableParDo< /** * Creates an {@link DoFnInvoker.ArgumentProvider} that provides the given tracker as well as - * the given - * {@link ProcessContext} (which is also provided when a {@link Context} is requested. + * the given {@link ProcessContext} (which is also provided when a {@link Context} is requested. */ private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapTracker( TrackerT tracker, DoFn<InputT, OutputT>.ProcessContext processContext) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index 29ff838..990d892 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -29,6 +29,7 @@ import static org.junit.Assert.assertTrue; import java.io.Serializable; import java.util.Arrays; +import java.util.Collection; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; @@ -38,6 +39,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -47,8 +49,13 @@ import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.KeyedWorkItems; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.util.state.TimerInternalsFactory; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; @@ -120,6 +127,12 @@ public class SplittableParDoTest { .setIsBoundedInternal(PCollection.IsBounded.BOUNDED); } + private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<String>() {}; + + private ParDo.BoundMulti<Integer, String> makeParDo(DoFn<Integer, String> fn) { + return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty()); + } + @Test public void testBoundednessForBoundedFn() { Pipeline pipeline = TestPipeline.create(); @@ -128,14 +141,15 @@ public class SplittableParDoTest { "Applying a bounded SDF to a bounded collection produces a bounded collection", PCollection.IsBounded.BOUNDED, makeBoundedCollection(pipeline) - .apply("bounded to bounded", new SplittableParDo<>(boundedFn)) - .isBounded()); + .apply("bounded to bounded", new SplittableParDo<>(makeParDo(boundedFn))) + .get(MAIN_OUTPUT_TAG).isBounded()); assertEquals( "Applying a bounded SDF to an unbounded collection produces an unbounded collection", PCollection.IsBounded.UNBOUNDED, makeUnboundedCollection(pipeline) - .apply("bounded to unbounded", new SplittableParDo<>(boundedFn)) - .isBounded()); + .apply( + "bounded to unbounded", new SplittableParDo<>(makeParDo(boundedFn))) + .get(MAIN_OUTPUT_TAG).isBounded()); } @Test @@ -146,18 +160,27 @@ public class SplittableParDoTest { "Applying an unbounded SDF to a bounded collection produces a bounded collection", PCollection.IsBounded.UNBOUNDED, makeBoundedCollection(pipeline) - .apply("unbounded to bounded", new SplittableParDo<>(unboundedFn)) - .isBounded()); + .apply( + "unbounded to bounded", + new SplittableParDo<>(makeParDo(unboundedFn))) + .get(MAIN_OUTPUT_TAG).isBounded()); assertEquals( "Applying an unbounded SDF to an unbounded collection produces an unbounded collection", PCollection.IsBounded.UNBOUNDED, makeUnboundedCollection(pipeline) - .apply("unbounded to unbounded", new SplittableParDo<>(unboundedFn)) - .isBounded()); + .apply( + "unbounded to unbounded", + new SplittableParDo<>(makeParDo(unboundedFn))) + .get(MAIN_OUTPUT_TAG).isBounded()); } // ------------------------------- Tests for ProcessFn --------------------------------- + enum WindowExplosion { + EXPLODE_WINDOWS, + DO_NOT_EXPLODE_WINDOWS + } + /** * A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but possibly over multiple * {@link DoFn.ProcessElement} calls). @@ -179,6 +202,46 @@ public class SplittableParDoTest { new SplittableParDo.ProcessFn<>( fn, inputCoder, restrictionCoder, IntervalWindow.getCoder()); this.tester = DoFnTester.of(processFn); + processFn.setStateInternalsFactory( + new StateInternalsFactory<String>() { + @Override + public StateInternals<String> stateInternalsForKey(String key) { + return tester.getStateInternals(); + } + }); + processFn.setTimerInternalsFactory( + new TimerInternalsFactory<String>() { + @Override + public TimerInternals timerInternalsForKey(String key) { + return tester.getTimerInternals(); + } + }); + processFn.setOutputWindowedValue( + new OutputWindowedValue<OutputT>() { + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + tester + .getMutableOutput(tester.getMainOutputTag()) + .add(WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + tester.getMutableOutput(tag).add(WindowedValue.of(output, timestamp, windows, pane)); + } + }); + // Do not clone since ProcessFn references non-serializable DoFnTester itself + // through the state/timer/output callbacks. + this.tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE); this.tester.startBundle(); this.tester.advanceProcessingTime(currentProcessingTime); @@ -192,12 +255,24 @@ public class SplittableParDoTest { ElementAndRestriction.of(element, restriction), currentProcessingTime, GlobalWindow.INSTANCE, - PaneInfo.ON_TIME_AND_ONLY_FIRING)); + PaneInfo.ON_TIME_AND_ONLY_FIRING), + WindowExplosion.DO_NOT_EXPLODE_WINDOWS); } - void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue) + void startElement( + WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue, + WindowExplosion explosion) throws Exception { - tester.processElement(KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue))); + switch (explosion) { + case EXPLODE_WINDOWS: + tester.processElement( + KeyedWorkItems.elementsWorkItem("key", windowedValue.explodeWindows())); + break; + case DO_NOT_EXPLODE_WINDOWS: + tester.processElement( + KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue))); + break; + } } /** @@ -253,9 +328,6 @@ public class SplittableParDoTest { DoFn<Integer, String> fn = new ToStringFn(); Instant base = Instant.now(); - ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester = - new ProcessFnTester<>( - base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class)); IntervalWindow w1 = new IntervalWindow( @@ -267,20 +339,26 @@ public class SplittableParDoTest { new IntervalWindow( base.minus(Duration.standardMinutes(3)), base.plus(Duration.standardMinutes(3))); - tester.startElement( - WindowedValue.of( - ElementAndRestriction.of(42, new SomeRestriction()), - base, - Arrays.asList(w1, w2, w3), - PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) { - assertEquals( - Arrays.asList( - TimestampedValue.of("42a", base), - TimestampedValue.of("42b", base), - TimestampedValue.of("42c", base)), - tester.peekOutputElementsInWindow(w)); + for (WindowExplosion explosion : WindowExplosion.values()) { + ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester = + new ProcessFnTester<>( + base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class)); + tester.startElement( + WindowedValue.of( + ElementAndRestriction.of(42, new SomeRestriction()), + base, + Arrays.asList(w1, w2, w3), + PaneInfo.ON_TIME_AND_ONLY_FIRING), + explosion); + + for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) { + assertEquals( + Arrays.asList( + TimestampedValue.of("42a", base), + TimestampedValue.of("42b", base), + TimestampedValue.of("42c", base)), + tester.peekOutputElementsInWindow(w)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java index 680a971..04becd7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java @@ -17,48 +17,23 @@ */ package org.apache.beam.runners.direct; -import static com.google.common.base.Preconditions.checkArgument; - -import org.apache.beam.runners.core.GBKIntoKeyedWorkItems; -import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItemCoder; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -/** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */ +/** + * Provides an implementation of {@link SplittableParDo.GBKIntoKeyedWorkItems} for the Direct + * Runner. + */ class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT> implements PTransformOverrideFactory< PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>, - GBKIntoKeyedWorkItems<KeyT, InputT>> { + SplittableParDo.GBKIntoKeyedWorkItems<KeyT, InputT>> { @Override public PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> - override(GBKIntoKeyedWorkItems<KeyT, InputT> transform) { - return new DirectGBKIntoKeyedWorkItems<>(transform.getName()); - } - - /** The Direct Runner specific implementation of {@link GBKIntoKeyedWorkItems}. */ - private static class DirectGBKIntoKeyedWorkItems<KeyT, InputT> - extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> { - DirectGBKIntoKeyedWorkItems(String name) { - super(name); - } - - @Override - public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) { - checkArgument(input.getCoder() instanceof KvCoder); - KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder(); - return input - // TODO: Perhaps windowing strategy should instead be set by ReifyTAW, or by DGBKO - .setWindowingStrategyInternal(WindowingStrategy.globalDefault()) - .apply(new DirectGroupByKey.DirectGroupByKeyOnly<KeyT, InputT>()) - .setCoder( - KeyedWorkItemCoder.of( - kvCoder.getKeyCoder(), - kvCoder.getValueCoder(), - input.getWindowingStrategy().getWindowFn().windowCoder())); - } + override(SplittableParDo.GBKIntoKeyedWorkItems<KeyT, InputT> transform) { + return new DirectGroupByKey.DirectGroupByKeyOnly<>(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java index 219314a..efee801 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java @@ -65,7 +65,7 @@ class DirectGroupByKey<K, V> KeyedWorkItemCoder.of( inputCoder.getKeyCoder(), inputCoder.getValueCoder(), - input.getWindowingStrategy().getWindowFn().windowCoder())) + inputWindowingStrategy.getWindowFn().windowCoder())) // Group each key's values by window, merging windows as needed. .apply( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index f71e109..82de9ab 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -30,7 +30,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; -import org.apache.beam.runners.core.GBKIntoKeyedWorkItems; +import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; @@ -88,7 +88,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { .put(ParDo.Bound.class, new ParDoSingleViaMultiOverrideFactory()) .put(ParDo.BoundMulti.class, new ParDoMultiOverrideFactory()) .put( - GBKIntoKeyedWorkItems.class, + SplittableParDo.GBKIntoKeyedWorkItems.class, new DirectGBKIntoKeyedWorkItemsOverrideFactory()) .build(); @@ -307,8 +307,8 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { @SuppressWarnings("rawtypes") KeyedPValueTrackingVisitor keyedPValueVisitor = KeyedPValueTrackingVisitor.create( - ImmutableSet.<Class<? extends PTransform>>of( - GBKIntoKeyedWorkItems.class, + ImmutableSet.of( + SplittableParDo.GBKIntoKeyedWorkItems.class, DirectGroupByKeyOnly.class, DirectGroupAlsoByWindow.class)); pipeline.traverseTopologically(keyedPValueVisitor); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java index 67d957c..cd644a6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java @@ -56,9 +56,9 @@ class DoFnLifecycleManager { thrownOnTeardown = new ConcurrentHashMap<>(); } - public DoFn<?, ?> get() throws Exception { + public <InputT, OutputT> DoFn<InputT, OutputT> get() throws Exception { Thread currentThread = Thread.currentThread(); - return outstanding.get(currentThread); + return (DoFn<InputT, OutputT>) outstanding.get(currentThread); } public void remove() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 750e5f1..504ddc4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.direct; import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -58,9 +57,9 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>(); for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) { outputBundles.put( - outputEntry.getKey(), - evaluationContext.createBundle(outputEntry.getValue())); + outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue())); } + BundleOutputManager outputManager = BundleOutputManager.create(outputBundles); ReadyCheckingSideInputReader sideInputReader = evaluationContext.createSideInputReader(sideInputs); @@ -69,7 +68,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { evaluationContext.getPipelineOptions(), fn, sideInputReader, - BundleOutputManager.create(outputBundles), + outputManager, mainOutputTag, sideOutputTags, stepContext, @@ -85,12 +84,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { } return new ParDoEvaluator<>( - evaluationContext, - runner, - application, - aggregatorChanges, - outputBundles.values(), - stepContext); + evaluationContext, runner, application, aggregatorChanges, outputManager, stepContext); } //////////////////////////////////////////////////////////////////////////////////////////////// @@ -99,7 +93,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner; private final AppliedPTransform<?, ?, ?> transform; private final AggregatorContainer.Mutator aggregatorChanges; - private final Collection<UncommittedBundle<?>> outputBundles; + private final BundleOutputManager outputManager; private final DirectStepContext stepContext; private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements; @@ -109,17 +103,21 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { PushbackSideInputDoFnRunner<InputT, ?> fnRunner, AppliedPTransform<?, ?, ?> transform, AggregatorContainer.Mutator aggregatorChanges, - Collection<UncommittedBundle<?>> outputBundles, + BundleOutputManager outputManager, DirectStepContext stepContext) { this.evaluationContext = evaluationContext; this.fnRunner = fnRunner; this.transform = transform; - this.outputBundles = outputBundles; + this.outputManager = outputManager; this.stepContext = stepContext; this.aggregatorChanges = aggregatorChanges; this.unprocessedElements = ImmutableList.builder(); } + public BundleOutputManager getOutputManager() { + return outputManager; + } + @Override public void processElement(WindowedValue<InputT> element) { try { @@ -147,7 +145,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { resultBuilder = StepTransformResult.withoutHold(transform); } return resultBuilder - .addOutput(outputBundles) + .addOutput(outputManager.bundles.values()) .withTimerUpdate(stepContext.getTimerUpdate()) .withAggregatorChanges(aggregatorChanges) .addUnprocessedElements(unprocessedElements.build()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 02e034a..ec5dc2c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -57,6 +57,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator public <T> TransformEvaluator<T> forApplication( AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception { + @SuppressWarnings("unchecked") AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> parDoApplication = (AppliedPTransform< @@ -93,13 +94,12 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator */ @SuppressWarnings({"unchecked", "rawtypes"}) TransformEvaluator<InputT> createEvaluator( - AppliedPTransform<PCollection<?>, PCollectionTuple, ?> - application, - StructuralKey<?> inputBundleKey, - DoFn<InputT, OutputT> doFn, - List<PCollectionView<?>> sideInputs, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags) + AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application, + StructuralKey<?> inputBundleKey, + DoFn<InputT, OutputT> doFn, + List<PCollectionView<?>> sideInputs, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> sideOutputTags) throws Exception { String stepName = evaluationContext.getStepName(application); DirectStepContext stepContext = @@ -107,21 +107,40 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator .getExecutionContext(application, inputBundleKey) .getOrCreateStepContext(stepName, stepName); - DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn); + DoFnLifecycleManager fnManager = getManagerForCloneOf(doFn); + return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping( + createParDoEvaluator( + application, + sideInputs, + mainOutputTag, + sideOutputTags, + stepContext, + fnManager.<InputT, OutputT>get(), + fnManager), + fnManager); + } + + ParDoEvaluator<InputT, OutputT> createParDoEvaluator( + AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application, + List<PCollectionView<?>> sideInputs, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + DirectStepContext stepContext, + DoFn<InputT, OutputT> fn, + DoFnLifecycleManager fnManager) + throws Exception { try { - return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping( - ParDoEvaluator.<InputT, OutputT>create( - evaluationContext, - stepContext, - application, - application.getInput().getWindowingStrategy(), - fnManager.get(), - sideInputs, - mainOutputTag, - sideOutputTags, - application.getOutput().getAll()), - fnManager); + return ParDoEvaluator.create( + evaluationContext, + stepContext, + application, + application.getInput().getWindowingStrategy(), + fn, + sideInputs, + mainOutputTag, + sideOutputTags, + application.getOutput().getAll()); } catch (Exception e) { try { fnManager.remove(); @@ -134,4 +153,8 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator throw e; } } + + public DoFnLifecycleManager getManagerForCloneOf(DoFn<?, ?> fn) { + return fnClones.getUnchecked(fn); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 8db5159..9c9256d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -49,7 +49,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT> DoFn<InputT, OutputT> fn = transform.getNewFn(); DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); if (signature.processElement().isSplittable()) { - return new SplittableParDo(fn); + return new SplittableParDo(transform); } else if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java new file mode 100644 index 0000000..0eca710 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import java.util.Collection; +import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.ElementAndRestriction; +import org.apache.beam.runners.core.OutputWindowedValue; +import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.util.state.TimerInternalsFactory; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT> + implements TransformEvaluatorFactory { + private final ParDoEvaluatorFactory< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + delegateFactory; + private final EvaluationContext evaluationContext; + + SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext); + } + + @Override + public <T> TransformEvaluator<T> forApplication( + AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception { + @SuppressWarnings({"unchecked", "rawtypes"}) + TransformEvaluator<T> evaluator = + (TransformEvaluator<T>) + createEvaluator((AppliedPTransform) application, (CommittedBundle) inputBundle); + return evaluator; + } + + @Override + public void cleanup() throws Exception { + delegateFactory.cleanup(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private TransformEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> + createEvaluator( + AppliedPTransform< + PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>, + PCollectionTuple, SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT>> + application, + CommittedBundle<InputT> inputBundle) + throws Exception { + final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT> transform = + application.getTransform(); + + DoFnLifecycleManager fnManager = delegateFactory.getManagerForCloneOf(transform.getFn()); + + SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, ?> processFn = + transform.newProcessFn(fnManager.<InputT, OutputT>get()); + + String stepName = evaluationContext.getStepName(application); + final DirectExecutionContext.DirectStepContext stepContext = + evaluationContext + .getExecutionContext(application, inputBundle.getKey()) + .getOrCreateStepContext(stepName, stepName); + + ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + parDoEvaluator = + delegateFactory.createParDoEvaluator( + application, + transform.getSideInputs(), + transform.getMainOutputTag(), + transform.getSideOutputTags().getAll(), + stepContext, + processFn, + fnManager); + + processFn.setStateInternalsFactory( + new StateInternalsFactory<String>() { + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public StateInternals<String> stateInternalsForKey(String key) { + return (StateInternals) stepContext.stateInternals(); + } + }); + + processFn.setTimerInternalsFactory( + new TimerInternalsFactory<String>() { + @Override + public TimerInternals timerInternalsForKey(String key) { + return stepContext.timerInternals(); + } + }); + + final OutputManager outputManager = parDoEvaluator.getOutputManager(); + processFn.setOutputWindowedValue( + new OutputWindowedValue<OutputT>() { + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputManager.output( + transform.getMainOutputTag(), WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane)); + } + }); + + return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index a4c462a..1ddf9f4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; @@ -61,6 +62,10 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { .put( TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class, new TestStreamEvaluatorFactory(ctxt)) + // Runner-specific primitive used in expansion of SplittableParDo + .put( + SplittableParDo.ProcessElements.class, + new SplittableProcessElementsEvaluatorFactory<>(ctxt)) .build(); return new TransformEvaluatorRegistry(primitives); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java index c164ce6..f9e833f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.Serializable; import java.util.ArrayList; @@ -32,20 +33,28 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Keys; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.MutableDateTime; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -66,6 +75,11 @@ public class SplittableDoFnTest { this.from = from; this.to = to; } + + @Override + public String toString() { + return "OffsetRange{" + "from=" + from + ", to=" + to + '}'; + } } private static class OffsetRangeTracker implements RestrictionTracker<OffsetRange> { @@ -140,11 +154,8 @@ public class SplittableDoFnTest { } } - @Ignore( - "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; " - + "It must be implemented as a primitive.") @Test - public void testPairWithIndexBasic() throws ClassNotFoundException { + public void testPairWithIndexBasic() { Pipeline p = TestPipeline.create(); p.getOptions().setRunner(DirectRunner.class); PCollection<KV<String, Integer>> res = @@ -167,11 +178,8 @@ public class SplittableDoFnTest { p.run(); } - @Ignore( - "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; " - + "It must be implemented as a primitive.") @Test - public void testPairWithIndexWindowedTimestamped() throws ClassNotFoundException { + public void testPairWithIndexWindowedTimestamped() { // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps // of elements in the input collection. Pipeline p = TestPipeline.create(); @@ -228,4 +236,172 @@ public class SplittableDoFnTest { } p.run(); } + + private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> { + private final PCollectionView<String> sideInput; + private final TupleTag<String> sideOutput; + + private SDFWithSideInputsAndOutputs( + PCollectionView<String> sideInput, TupleTag<String> sideOutput) { + this.sideInput = sideInput; + this.sideOutput = sideOutput; + } + + @ProcessElement + public void process(ProcessContext c, OffsetRangeTracker tracker) { + checkState(tracker.tryClaim(tracker.currentRestriction().from)); + String side = c.sideInput(sideInput); + c.output("main:" + side + ":" + c.element()); + c.sideOutput(sideOutput, "side:" + side + ":" + c.element()); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction(Integer value) { + return new OffsetRange(0, 1); + } + + @NewTracker + public OffsetRangeTracker newTracker(OffsetRange range) { + return new OffsetRangeTracker(range); + } + } + + @Test + public void testSideInputsAndOutputs() throws Exception { + Pipeline p = TestPipeline.create(); + p.getOptions().setRunner(DirectRunner.class); + + PCollectionView<String> sideInput = + p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton()); + TupleTag<String> mainOutputTag = new TupleTag<>("main"); + TupleTag<String> sideOutputTag = new TupleTag<>("side"); + + PCollectionTuple res = + p.apply("input", Create.of(0, 1, 2)) + .apply( + ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, sideOutputTag)) + .withSideInputs(sideInput) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + res.get(mainOutputTag).setCoder(StringUtf8Coder.of()); + res.get(sideOutputTag).setCoder(StringUtf8Coder.of()); + + PAssert.that(res.get(mainOutputTag)) + .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2")); + PAssert.that(res.get(sideOutputTag)) + .containsInAnyOrder(Arrays.asList("side:foo:0", "side:foo:1", "side:foo:2")); + + p.run(); + } + + @Test + public void testLateData() throws Exception { + Pipeline p = TestPipeline.create(); + p.getOptions().setRunner(DirectRunner.class); + + Instant base = Instant.now(); + + TestStream<String> stream = + TestStream.create(StringUtf8Coder.of()) + .advanceWatermarkTo(base) + .addElements("aa") + .advanceWatermarkTo(base.plus(Duration.standardSeconds(5))) + .addElements(TimestampedValue.of("bb", base.minus(Duration.standardHours(1)))) + .advanceProcessingTime(Duration.standardHours(1)) + .advanceWatermarkToInfinity(); + + PCollection<String> input = + p.apply(stream) + .apply( + Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) + .withAllowedLateness(Duration.standardMinutes(1))); + + PCollection<KV<String, Integer>> afterSDF = + input + .apply(ParDo.of(new PairStringWithIndexToLength())) + .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())); + + PCollection<String> nonLate = + afterSDF.apply(GroupByKey.<String, Integer>create()).apply(Keys.<String>create()); + + // The splittable DoFn itself should not drop any data and act as pass-through. + PAssert.that(afterSDF) + .containsInAnyOrder( + Arrays.asList(KV.of("aa", 0), KV.of("aa", 1), KV.of("bb", 0), KV.of("bb", 1))); + + // But it should preserve the windowing strategy of the data, including allowed lateness: + // the follow-up GBK should drop the late data. + assertEquals(afterSDF.getWindowingStrategy(), input.getWindowingStrategy()); + PAssert.that(nonLate).containsInAnyOrder("aa"); + + p.run(); + } + + private static class SDFWithLifecycle extends DoFn<String, String> { + private enum State { + BEFORE_SETUP, + OUTSIDE_BUNDLE, + INSIDE_BUNDLE, + TORN_DOWN + } + + private State state = State.BEFORE_SETUP; + + @ProcessElement + public void processElement(ProcessContext c, OffsetRangeTracker tracker) { + assertEquals(State.INSIDE_BUNDLE, state); + assertTrue(tracker.tryClaim(0)); + c.output(c.element()); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction(String value) { + return new OffsetRange(0, 1); + } + + @NewTracker + public OffsetRangeTracker newTracker(OffsetRange range) { + return new OffsetRangeTracker(range); + } + + @Setup + public void setUp() { + assertEquals(State.BEFORE_SETUP, state); + state = State.OUTSIDE_BUNDLE; + } + + @StartBundle + public void startBundle(Context c) { + assertEquals(State.OUTSIDE_BUNDLE, state); + state = State.INSIDE_BUNDLE; + } + + @FinishBundle + public void finishBundle(Context c) { + assertEquals(State.INSIDE_BUNDLE, state); + state = State.OUTSIDE_BUNDLE; + } + + @Teardown + public void tearDown() { + assertEquals(State.OUTSIDE_BUNDLE, state); + state = State.TORN_DOWN; + } + } + + @Test + public void testLifecycleMethods() throws Exception { + Pipeline p = TestPipeline.create(); + p.getOptions().setRunner(DirectRunner.class); + + PCollection<String> res = + p.apply(Create.of("a", "b", "c")).apply(ParDo.of(new SDFWithLifecycle())); + + PAssert.that(res).containsInAnyOrder("a", "b", "c"); + + p.run(); + } + + // TODO (https://issues.apache.org/jira/browse/BEAM-988): Test that Splittable DoFn + // emits output immediately (i.e. has a pass-through trigger) regardless of input's + // windowing/triggering strategy. } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 3f1a3f9..7aabec9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -120,6 +120,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * should be in, throwing an exception if the {@code WindowFn} attempts * to access any information about the input element. The output element * will have a timestamp of negative infinity. + * + * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from + * {@link StartBundle} or {@link FinishBundle} methods. */ public abstract void output(OutputT output); @@ -142,6 +145,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * should be in, throwing an exception if the {@code WindowFn} attempts * to access any information about the input element except for the * timestamp. + * + * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from + * {@link StartBundle} or {@link FinishBundle} methods. */ public abstract void outputWithTimestamp(OutputT output, Instant timestamp); @@ -168,6 +174,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * to access any information about the input element. The output element * will have a timestamp of negative infinity. * + * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from + * {@link StartBundle} or {@link FinishBundle} methods. + * * @see ParDo#withOutputTags */ public abstract <T> void sideOutput(TupleTag<T> tag, T output); @@ -192,6 +201,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * to access any information about the input element except for the * timestamp. * + * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from + * {@link StartBundle} or {@link FinishBundle} methods. + * * @see ParDo#withOutputTags */ public abstract <T> void sideOutputWithTimestamp( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index daa8a06..0c6043f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -140,6 +140,15 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { windowValues.put(window, value); } + @SuppressWarnings("unchecked") + public <K> StateInternals<K> getStateInternals() { + return (StateInternals<K>) stateInternals; + } + + public TimerInternals getTimerInternals() { + return timerInternals; + } + /** * When a {@link DoFnTester} should clone the {@link DoFn} under test and how it should manage * the lifecycle of the {@link DoFn}. @@ -321,7 +330,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { * */ public List<OutputT> peekOutputElements() { - // TODO: Should we return an unmodifiable list? return Lists.transform( peekOutputElementsWithTimestamp(), new Function<TimestampedValue<OutputT>, OutputT>() { @@ -344,7 +352,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { @Experimental public List<TimestampedValue<OutputT>> peekOutputElementsWithTimestamp() { // TODO: Should we return an unmodifiable list? - return Lists.transform(getOutput(mainOutputTag), + return Lists.transform(getImmutableOutput(mainOutputTag), new Function<WindowedValue<OutputT>, TimestampedValue<OutputT>>() { @Override @SuppressWarnings("unchecked") @@ -370,7 +378,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { TupleTag<OutputT> tag, BoundedWindow window) { ImmutableList.Builder<TimestampedValue<OutputT>> valuesBuilder = ImmutableList.builder(); - for (WindowedValue<OutputT> value : getOutput(tag)) { + for (WindowedValue<OutputT> value : getImmutableOutput(tag)) { if (value.getWindows().contains(window)) { valuesBuilder.add(TimestampedValue.of(value.getValue(), value.getTimestamp())); } @@ -384,7 +392,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { * @see #peekOutputElements */ public void clearOutputElements() { - peekOutputElements().clear(); + getMutableOutput(mainOutputTag).clear(); } /** @@ -425,7 +433,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { */ public <T> List<T> peekSideOutputElements(TupleTag<T> tag) { // TODO: Should we return an unmodifiable list? - return Lists.transform(getOutput(tag), + return Lists.transform(getImmutableOutput(tag), new Function<WindowedValue<T>, T>() { @SuppressWarnings("unchecked") @Override @@ -441,7 +449,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { * @see #peekSideOutputElements */ public <T> void clearSideOutputElements(TupleTag<T> tag) { - peekSideOutputElements(tag).clear(); + getMutableOutput(tag).clear(); } /** @@ -502,10 +510,25 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { return combiner.extractOutput(accumulator); } - private <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) { + private <T> List<WindowedValue<T>> getImmutableOutput(TupleTag<T> tag) { @SuppressWarnings({"unchecked", "rawtypes"}) List<WindowedValue<T>> elems = (List) outputs.get(tag); - return MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList()); + return ImmutableList.copyOf( + MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList())); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public <T> List<WindowedValue<T>> getMutableOutput(TupleTag<T> tag) { + List<WindowedValue<T>> outputList = (List) outputs.get(tag); + if (outputList == null) { + outputList = new ArrayList<>(); + outputs.put(tag, (List) outputList); + } + return outputList; + } + + public TupleTag<OutputT> getMainOutputTag() { + return mainOutputTag; } private TestContext createContext(OldDoFn<InputT, OutputT> fn) { @@ -590,17 +613,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } public <T> void noteOutput(TupleTag<T> tag, WindowedValue<T> output) { - getOutputList(tag).add(output); - } - - private <T> List<WindowedValue<T>> getOutputList(TupleTag<T> tag) { - @SuppressWarnings({"unchecked", "rawtypes"}) - List<WindowedValue<T>> outputList = (List) outputs.get(tag); - if (outputList == null) { - outputList = new ArrayList<>(); - outputs.put(tag, (List) outputList); - } - return outputList; + getMutableOutput(tag).add(output); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java new file mode 100644 index 0000000..b9c3d5e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.util.TimerInternals; + +/** + * A factory for providing {@link TimerInternals} for a particular key. + * + * <p>Because it will generally be embedded in a {@link org.apache.beam.sdk.transforms.DoFn DoFn}, + * albeit at execution time, it is marked {@link Serializable}. + */ +@Experimental(Kind.STATE) +public interface TimerInternalsFactory<K> { + + /** Returns {@link TimerInternals} for the provided key. */ + TimerInternals timerInternalsForKey(K key); +}