http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java new file mode 100644 index 0000000..5fe17ad --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; +import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; +import org.apache.beam.sdk.util.state.TimerCallback; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Test utility that runs a {@link Trigger}, using in-memory stub implementation to provide + * the {@link StateInternals}. + * + * @param <W> The type of windows being used. + */ +public class TriggerTester<InputT, W extends BoundedWindow> { + + /** + * A {@link TriggerTester} specialized to {@link Integer} values, so elements and timestamps + * can be conflated. Today, triggers should not observed the element type, so this is the + * only trigger tester that needs to be used. + */ + public static class SimpleTriggerTester<W extends BoundedWindow> + extends TriggerTester<Integer, W> { + + private SimpleTriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception { + super(windowingStrategy); + } + + public void injectElements(int... values) throws Exception { + List<TimestampedValue<Integer>> timestampedValues = + Lists.newArrayListWithCapacity(values.length); + for (int value : values) { + timestampedValues.add(TimestampedValue.of(value, new Instant(value))); + } + injectElements(timestampedValues); + } + + public SimpleTriggerTester<W> withAllowedLateness(Duration allowedLateness) throws Exception { + return new SimpleTriggerTester<>( + windowingStrategy.withAllowedLateness(allowedLateness)); + } + } + + protected final WindowingStrategy<Object, W> windowingStrategy; + + private final TestInMemoryStateInternals<?> stateInternals = + new TestInMemoryStateInternals<Object>(null /* key */); + private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); + private final TriggerContextFactory<W> contextFactory; + private final WindowFn<Object, W> windowFn; + private final ActiveWindowSet<W> activeWindows; + private final Map<W, W> windowToMergeResult; + + /** + * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link Trigger} + * under test. + */ + private final ExecutableTrigger executableTrigger; + + /** + * A map from a window and trigger to whether that trigger is finished for the window. + */ + private final Map<W, FinishedTriggers> finishedSets; + + public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger( + Trigger trigger, WindowFn<Object, W> windowFn) + throws Exception { + WindowingStrategy<Object, W> windowingStrategy = + WindowingStrategy.of(windowFn).withTrigger(trigger) + // Merging requires accumulation mode or early firings can break up a session. + // Not currently an issue with the tester (because we never GC) but we don't want + // mystery failures due to violating this need. + .withMode(windowFn.isNonMerging() + ? AccumulationMode.DISCARDING_FIRED_PANES + : AccumulationMode.ACCUMULATING_FIRED_PANES); + + return new SimpleTriggerTester<>(windowingStrategy); + } + + public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> forAdvancedTrigger( + Trigger trigger, WindowFn<Object, W> windowFn) throws Exception { + WindowingStrategy<Object, W> strategy = + WindowingStrategy.of(windowFn).withTrigger(trigger) + // Merging requires accumulation mode or early firings can break up a session. + // Not currently an issue with the tester (because we never GC) but we don't want + // mystery failures due to violating this need. + .withMode(windowFn.isNonMerging() + ? AccumulationMode.DISCARDING_FIRED_PANES + : AccumulationMode.ACCUMULATING_FIRED_PANES); + + return new TriggerTester<>(strategy); + } + + protected TriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception { + this.windowingStrategy = windowingStrategy; + this.windowFn = windowingStrategy.getWindowFn(); + this.executableTrigger = windowingStrategy.getTrigger(); + this.finishedSets = new HashMap<>(); + + this.activeWindows = + windowFn.isNonMerging() + ? new NonMergingActiveWindowSet<W>() + : new MergingActiveWindowSet<W>(windowFn, stateInternals); + this.windowToMergeResult = new HashMap<>(); + + this.contextFactory = + new TriggerContextFactory<>(windowingStrategy.getWindowFn(), stateInternals, activeWindows); + } + + /** + * Instructs the trigger to clear its state for the given window. + */ + public void clearState(W window) throws Exception { + executableTrigger.invokeClear(contextFactory.base(window, + new TestTimers(windowNamespace(window)), executableTrigger, getFinishedSet(window))); + } + + /** + * Asserts that the trigger has actually cleared all of its state for the given window. Since + * the trigger under test is the root, this makes the assert for all triggers regardless + * of their position in the trigger tree. + */ + public void assertCleared(W window) { + for (StateNamespace untypedNamespace : stateInternals.getNamespacesInUse()) { + if (untypedNamespace instanceof WindowAndTriggerNamespace) { + @SuppressWarnings("unchecked") + WindowAndTriggerNamespace<W> namespace = (WindowAndTriggerNamespace<W>) untypedNamespace; + if (namespace.getWindow().equals(window)) { + Set<?> tagsInUse = stateInternals.getTagsInUse(namespace); + assertTrue("Trigger has not cleared tags: " + tagsInUse, tagsInUse.isEmpty()); + } + } + } + } + + /** + * Returns {@code true} if the {@link Trigger} under test is finished for the given window. + */ + public boolean isMarkedFinished(W window) { + FinishedTriggers finishedSet = finishedSets.get(window); + if (finishedSet == null) { + return false; + } + + return finishedSet.isFinished(executableTrigger); + } + + private StateNamespace windowNamespace(W window) { + return StateNamespaces.window(windowFn.windowCoder(), checkNotNull(window)); + } + + /** + * Advance the input watermark to the specified time, then advance the output watermark as far as + * possible. + */ + public void advanceInputWatermark(Instant newInputWatermark) throws Exception { + // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 + timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark); + } + + /** Advance the processing time to the specified time. */ + public void advanceProcessingTime(Instant newProcessingTime) throws Exception { + // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 + timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime); + } + + /** + * Inject all the timestamped values (after passing through the window function) as if they + * arrived in a single chunk of a bundle (or work-unit). + */ + @SafeVarargs + public final void injectElements(TimestampedValue<InputT>... values) throws Exception { + injectElements(Arrays.asList(values)); + } + + public final void injectElements(Collection<TimestampedValue<InputT>> values) throws Exception { + for (TimestampedValue<InputT> value : values) { + WindowTracing.trace("TriggerTester.injectElements: {}", value); + } + + List<WindowedValue<InputT>> windowedValues = Lists.newArrayListWithCapacity(values.size()); + + for (TimestampedValue<InputT> input : values) { + try { + InputT value = input.getValue(); + Instant timestamp = input.getTimestamp(); + Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>( + windowFn, value, timestamp, GlobalWindow.INSTANCE)); + + for (W window : assignedWindows) { + activeWindows.addActiveForTesting(window); + + // Today, triggers assume onTimer firing at the watermark time, whether or not they + // explicitly set the timer themselves. So this tester must set it. + timerInternals.setTimer( + TimerData.of(windowNamespace(window), window.maxTimestamp(), TimeDomain.EVENT_TIME)); + } + + windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + for (WindowedValue<InputT> windowedValue : windowedValues) { + for (BoundedWindow untypedWindow : windowedValue.getWindows()) { + // SDK is responsible for type safety + @SuppressWarnings("unchecked") + W window = mergeResult((W) untypedWindow); + + Trigger.OnElementContext context = contextFactory.createOnElementContext(window, + new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(), + executableTrigger, getFinishedSet(window)); + + if (!context.trigger().isFinished()) { + executableTrigger.invokeOnElement(context); + } + } + } + } + + public boolean shouldFire(W window) throws Exception { + Trigger.TriggerContext context = contextFactory.base( + window, + new TestTimers(windowNamespace(window)), + executableTrigger, getFinishedSet(window)); + executableTrigger.getSpec().prefetchShouldFire(context.state()); + return executableTrigger.invokeShouldFire(context); + } + + public void fireIfShouldFire(W window) throws Exception { + Trigger.TriggerContext context = contextFactory.base( + window, + new TestTimers(windowNamespace(window)), + executableTrigger, getFinishedSet(window)); + + executableTrigger.getSpec().prefetchShouldFire(context.state()); + if (executableTrigger.invokeShouldFire(context)) { + executableTrigger.getSpec().prefetchOnFire(context.state()); + executableTrigger.invokeOnFire(context); + if (context.trigger().isFinished()) { + activeWindows.remove(window); + executableTrigger.invokeClear(context); + } + } + } + + public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, boolean value) { + getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex), value); + } + + /** + * Invokes merge from the {@link WindowFn} a single time and passes the resulting merge + * events on to the trigger under test. Does not persist the fact that merging happened, + * since it is just to test the trigger's {@code OnMerge} method. + */ + public final void mergeWindows() throws Exception { + windowToMergeResult.clear(); + activeWindows.merge(new MergeCallback<W>() { + @Override + public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {} + + @Override + public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception { + List<W> activeToBeMerged = new ArrayList<W>(); + for (W window : toBeMerged) { + windowToMergeResult.put(window, mergeResult); + if (activeWindows.isActive(window)) { + activeToBeMerged.add(window); + } + } + Map<W, FinishedTriggers> mergingFinishedSets = + Maps.newHashMapWithExpectedSize(activeToBeMerged.size()); + for (W oldWindow : activeToBeMerged) { + mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow)); + } + executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult, + new TestTimers(windowNamespace(mergeResult)), executableTrigger, + getFinishedSet(mergeResult), mergingFinishedSets)); + timerInternals.setTimer(TimerData.of( + windowNamespace(mergeResult), mergeResult.maxTimestamp(), TimeDomain.EVENT_TIME)); + } + }); + } + + public W mergeResult(W window) { + W result = windowToMergeResult.get(window); + return result == null ? window : result; + } + + private FinishedTriggers getFinishedSet(W window) { + FinishedTriggers finishedSet = finishedSets.get(window); + if (finishedSet == null) { + finishedSet = FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()); + finishedSets.put(window, finishedSet); + } + return finishedSet; + } + + private static class TestAssignContext<W extends BoundedWindow> + extends WindowFn<Object, W>.AssignContext { + private Object element; + private Instant timestamp; + private BoundedWindow window; + + public TestAssignContext( + WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) { + windowFn.super(); + this.element = element; + this.timestamp = timestamp; + this.window = window; + } + + @Override + public Object element() { + return element; + } + + @Override + public Instant timestamp() { + return timestamp; + } + + @Override + public BoundedWindow window() { + return window; + } + } + + private class TestTimers implements Timers { + private final StateNamespace namespace; + + public TestTimers(StateNamespace namespace) { + checkArgument(namespace instanceof WindowNamespace); + this.namespace = namespace; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain timeDomain) { + timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain)); + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { + timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain)); + } + + @Override + public Instant currentProcessingTime() { + return timerInternals.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timerInternals.currentSynchronizedProcessingTime(); + } + + @Override + public Instant currentEventTime() { + return timerInternals.currentInputWatermarkTime(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java deleted file mode 100644 index cc8c97f..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.base.Joiner; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.ExecutableTrigger; -import org.joda.time.Instant; - -/** - * Create a {@link Trigger} that fires and finishes once after all of its sub-triggers have fired. - */ -@Experimental(Experimental.Kind.TRIGGER) -public class AfterAll extends OnceTrigger { - - private AfterAll(List<Trigger> subTriggers) { - super(subTriggers); - checkArgument(subTriggers.size() > 1); - } - - /** - * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers. - */ - public static OnceTrigger of(OnceTrigger... triggers) { - return new AfterAll(Arrays.<Trigger>asList(triggers)); - } - - @Override - public void onElement(OnElementContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) { - // Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH. - // invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH. - subTrigger.invokeOnElement(c); - } - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - subTrigger.invokeOnMerge(c); - } - boolean allFinished = true; - for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) { - allFinished &= c.forTrigger(subTrigger1).trigger().isFinished(); - } - c.trigger().setFinished(allFinished); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - // This trigger will fire after the latest of its sub-triggers. - Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE; - for (Trigger subTrigger : subTriggers) { - Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window); - if (deadline.isBefore(subDeadline)) { - deadline = subDeadline; - } - } - return deadline; - } - - @Override - public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return new AfterAll(continuationTriggers); - } - - /** - * {@inheritDoc} - * - * @return {@code true} if all subtriggers return {@code true}. - */ - @Override - public boolean shouldFire(TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { - if (!context.forTrigger(subtrigger).trigger().isFinished() - && !subtrigger.invokeShouldFire(context)) { - return false; - } - } - return true; - } - - /** - * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire} - * because they all must be ready to fire. - */ - @Override - public void onOnlyFiring(TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { - subtrigger.invokeOnFire(context); - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder("AfterAll.of("); - Joiner.on(", ").appendTo(builder, subTriggers); - builder.append(")"); - - return builder.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java deleted file mode 100644 index c4bc946..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import com.google.common.collect.ImmutableList; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.List; -import java.util.Locale; -import java.util.Objects; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.InstantCoder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateMerging; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.joda.time.format.PeriodFormat; -import org.joda.time.format.PeriodFormatter; - -/** - * A base class for triggers that happen after a processing time delay from the arrival - * of the first element in a pane. - * - * <p>This class is for internal use only and may change at any time. - */ -@Experimental(Experimental.Kind.TRIGGER) -public abstract class AfterDelayFromFirstElement extends OnceTrigger { - - protected static final List<SerializableFunction<Instant, Instant>> IDENTITY = - ImmutableList.<SerializableFunction<Instant, Instant>>of(); - - protected static final StateTag<Object, AccumulatorCombiningState<Instant, - Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG = - StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( - "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder())); - - private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH); - - /** - * To complete an implementation, return the desired time from the TriggerContext. - */ - @Nullable - public abstract Instant getCurrentTime(Trigger.TriggerContext context); - - /** - * To complete an implementation, return a new instance like this one, but incorporating - * the provided timestamp mapping functions. Generally should be used by calling the - * constructor of this class from the constructor of the subclass. - */ - protected abstract AfterDelayFromFirstElement newWith( - List<SerializableFunction<Instant, Instant>> transform); - - /** - * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed in sequence. The - * overall mapping for an instance `instance` is `m_n(... m3(m2(m1(instant))`, - * implemented via #computeTargetTimestamp - */ - protected final List<SerializableFunction<Instant, Instant>> timestampMappers; - - private final TimeDomain timeDomain; - - public AfterDelayFromFirstElement( - TimeDomain timeDomain, - List<SerializableFunction<Instant, Instant>> timestampMappers) { - super(null); - this.timestampMappers = timestampMappers; - this.timeDomain = timeDomain; - } - - private Instant getTargetTimestamp(OnElementContext c) { - return computeTargetTimestamp(c.currentProcessingTime()); - } - - /** - * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater - * than the timestamp. - * - * <p>TODO: Consider sharing this with FixedWindows, and bring over the equivalent of - * CalendarWindows. - */ - public AfterDelayFromFirstElement alignedTo(final Duration size, final Instant offset) { - return newWith(new AlignFn(size, offset)); - } - - /** - * Aligns the time to be the smallest multiple of {@code size} greater than the timestamp - * since the epoch. - */ - public AfterDelayFromFirstElement alignedTo(final Duration size) { - return alignedTo(size, new Instant(0)); - } - - /** - * Adds some delay to the original target time. - * - * @param delay the delay to add - * @return An updated time trigger that will wait the additional time before firing. - */ - public AfterDelayFromFirstElement plusDelayOf(final Duration delay) { - return newWith(new DelayFn(delay)); - } - - /** - * @deprecated This will be removed in the next major version. Please use only - * {@link #plusDelayOf} and {@link #alignedTo}. - */ - @Deprecated - public OnceTrigger mappedTo(SerializableFunction<Instant, Instant> timestampMapper) { - return newWith(timestampMapper); - } - - @Override - public boolean isCompatible(Trigger other) { - if (!getClass().equals(other.getClass())) { - return false; - } - - AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other; - return this.timestampMappers.equals(that.timestampMappers); - } - - - private AfterDelayFromFirstElement newWith( - SerializableFunction<Instant, Instant> timestampMapper) { - return newWith( - ImmutableList.<SerializableFunction<Instant, Instant>>builder() - .addAll(timestampMappers) - .add(timestampMapper) - .build()); - } - - @Override - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = - "prefetch side effect") - public void prefetchOnElement(StateAccessor<?> state) { - state.access(DELAYED_UNTIL_TAG).readLater(); - } - - @Override - public void onElement(OnElementContext c) throws Exception { - CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG); - Instant oldDelayUntil = delayUntilState.read(); - - // Since processing time can only advance, resulting in target wake-up times we would - // ignore anyhow, we don't bother with it if it is already set. - if (oldDelayUntil != null) { - return; - } - - Instant targetTimestamp = getTargetTimestamp(c); - delayUntilState.add(targetTimestamp); - c.setTimer(targetTimestamp, timeDomain); - } - - @Override - public void prefetchOnMerge(MergingStateAccessor<?, ?> state) { - super.prefetchOnMerge(state); - StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG); - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - // NOTE: We could try to delete all timers which are still active, but we would - // need access to a timer context for each merging window. - // for (CombiningValueStateInternal<Instant, Combine.Holder<Instant>, Instant> state : - // c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) { - // Instant timestamp = state.get().read(); - // if (timestamp != null) { - // <context for merging window>.deleteTimer(timestamp, timeDomain); - // } - // } - // Instead let them fire and be ignored. - - // If the trigger is already finished, there is no way it will become re-activated - if (c.trigger().isFinished()) { - StateMerging.clear(c.state(), DELAYED_UNTIL_TAG); - // NOTE: We do not attempt to delete the timers. - return; - } - - // Determine the earliest point across all the windows, and delay to that. - StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG); - - Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read(); - if (earliestTargetTime != null) { - c.setTimer(earliestTargetTime, timeDomain); - } - } - - @Override - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = - "prefetch side effect") - public void prefetchShouldFire(StateAccessor<?> state) { - state.access(DELAYED_UNTIL_TAG).readLater(); - } - - @Override - public void clear(TriggerContext c) throws Exception { - c.state().access(DELAYED_UNTIL_TAG).clear(); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read(); - return delayedUntil != null - && getCurrentTime(context) != null - && getCurrentTime(context).isAfter(delayedUntil); - } - - @Override - protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { - clear(context); - } - - protected Instant computeTargetTimestamp(Instant time) { - Instant result = time; - for (SerializableFunction<Instant, Instant> timestampMapper : timestampMappers) { - result = timestampMapper.apply(result); - } - return result; - } - - /** - * A {@link SerializableFunction} to delay the timestamp at which this triggers fires. - */ - private static final class DelayFn implements SerializableFunction<Instant, Instant> { - private final Duration delay; - - public DelayFn(Duration delay) { - this.delay = delay; - } - - @Override - public Instant apply(Instant input) { - return input.plus(delay); - } - - @Override - public boolean equals(Object object) { - if (object == this) { - return true; - } - - if (!(object instanceof DelayFn)) { - return false; - } - - return this.delay.equals(((DelayFn) object).delay); - } - - @Override - public int hashCode() { - return Objects.hash(delay); - } - - @Override - public String toString() { - return PERIOD_FORMATTER.print(delay.toPeriod()); - } - } - - /** - * A {@link SerializableFunction} to align an instant to the nearest interval boundary. - */ - static final class AlignFn implements SerializableFunction<Instant, Instant> { - private final Duration size; - private final Instant offset; - - - /** - * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater - * than the timestamp. - */ - public AlignFn(Duration size, Instant offset) { - this.size = size; - this.offset = offset; - } - - @Override - public Instant apply(Instant point) { - long millisSinceStart = new Duration(offset, point).getMillis() % size.getMillis(); - return millisSinceStart == 0 ? point : point.plus(size).minus(millisSinceStart); - } - - @Override - public boolean equals(Object object) { - if (object == this) { - return true; - } - - if (!(object instanceof AlignFn)) { - return false; - } - - AlignFn other = (AlignFn) object; - return other.size.equals(this.size) - && other.offset.equals(this.offset); - } - - @Override - public int hashCode() { - return Objects.hash(size, offset); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java deleted file mode 100644 index 629c640..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.base.Joiner; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.util.ExecutableTrigger; -import org.joda.time.Instant; - -/** - * A composite {@link Trigger} that executes its sub-triggers in order. - * Only one sub-trigger is executing at a time, - * and any time it fires the {@code AfterEach} fires. When the currently executing - * sub-trigger finishes, the {@code AfterEach} starts executing the next sub-trigger. - * - * <p>{@code AfterEach.inOrder(t1, t2, ...)} finishes when all of the sub-triggers have finished. - * - * <p>The following properties hold: - * <ul> - * <li> {@code AfterEach.inOrder(AfterEach.inOrder(a, b), c)} behaves the same as - * {@code AfterEach.inOrder(a, b, c)} and {@code AfterEach.inOrder(a, AfterEach.inOrder(b, c)}. - * <li> {@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same as - * {@code Repeatedly.forever(a)}, since the repeated trigger never finishes. - * </ul> - */ -@Experimental(Experimental.Kind.TRIGGER) -public class AfterEach extends Trigger { - - private AfterEach(List<Trigger> subTriggers) { - super(subTriggers); - checkArgument(subTriggers.size() > 1); - } - - /** - * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers. - */ - @SafeVarargs - public static Trigger inOrder(Trigger... triggers) { - return new AfterEach(Arrays.<Trigger>asList(triggers)); - } - - @Override - public void onElement(OnElementContext c) throws Exception { - if (!c.trigger().isMerging()) { - // If merges are not possible, we need only run the first unfinished subtrigger - c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c); - } else { - // If merges are possible, we need to run all subtriggers in parallel - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - // Even if the subTrigger is done, it may be revived via merging and must have - // adequate state. - subTrigger.invokeOnElement(c); - } - } - } - - @Override - public void onMerge(OnMergeContext context) throws Exception { - // If merging makes a subtrigger no-longer-finished, it will automatically - // begin participating in shouldFire and onFire appropriately. - - // All the following triggers are retroactively "not started" but that is - // also automatic because they are cleared whenever this trigger - // fires. - boolean priorTriggersAllFinished = true; - for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) { - if (priorTriggersAllFinished) { - subTrigger.invokeOnMerge(context); - priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished(); - } else { - subTrigger.invokeClear(context); - } - } - updateFinishedState(context); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - // This trigger will fire at least once when the first trigger in the sequence - // fires at least once. - return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window); - } - - @Override - public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return Repeatedly.forever(new AfterFirst(continuationTriggers)); - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - ExecutableTrigger firstUnfinished = context.trigger().firstUnfinishedSubTrigger(); - return firstUnfinished.invokeShouldFire(context); - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { - context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context); - - // Reset all subtriggers if in a merging context; any may be revived by merging so they are - // all run in parallel for each pending pane. - if (context.trigger().isMerging()) { - for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) { - subTrigger.invokeClear(context); - } - } - - updateFinishedState(context); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder("AfterEach.inOrder("); - Joiner.on(", ").appendTo(builder, subTriggers); - builder.append(")"); - - return builder.toString(); - } - - private void updateFinishedState(TriggerContext context) { - context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java deleted file mode 100644 index 6b06cfa..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.base.Joiner; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.ExecutableTrigger; -import org.joda.time.Instant; - -/** - * Create a composite {@link Trigger} that fires once after at least one of its sub-triggers have - * fired. - */ -@Experimental(Experimental.Kind.TRIGGER) -public class AfterFirst extends OnceTrigger { - - AfterFirst(List<Trigger> subTriggers) { - super(subTriggers); - checkArgument(subTriggers.size() > 1); - } - - /** - * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers. - */ - public static OnceTrigger of(OnceTrigger... triggers) { - return new AfterFirst(Arrays.<Trigger>asList(triggers)); - } - - @Override - public void onElement(OnElementContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - subTrigger.invokeOnElement(c); - } - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - subTrigger.invokeOnMerge(c); - } - updateFinishedStatus(c); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - // This trigger will fire after the earliest of its sub-triggers. - Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (Trigger subTrigger : subTriggers) { - Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window); - if (deadline.isAfter(subDeadline)) { - deadline = subDeadline; - } - } - return deadline; - } - - @Override - public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return new AfterFirst(continuationTriggers); - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { - if (context.forTrigger(subtrigger).trigger().isFinished() - || subtrigger.invokeShouldFire(context)) { - return true; - } - } - return false; - } - - @Override - protected void onOnlyFiring(TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { - TriggerContext subContext = context.forTrigger(subtrigger); - if (subtrigger.invokeShouldFire(subContext)) { - // If the trigger is ready to fire, then do whatever it needs to do. - subtrigger.invokeOnFire(subContext); - } else { - // If the trigger is not ready to fire, it is nonetheless true that whatever - // pending pane it was tracking is now gone. - subtrigger.invokeClear(subContext); - } - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder("AfterFirst.of("); - Joiner.on(", ").appendTo(builder, subTriggers); - builder.append(")"); - - return builder.toString(); - } - - private void updateFinishedStatus(TriggerContext c) { - boolean anyFinished = false; - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - anyFinished |= c.forTrigger(subTrigger).trigger().isFinished(); - } - c.trigger().setFinished(anyFinished); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java deleted file mode 100644 index 8c128dd..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.List; -import java.util.Objects; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateMerging; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.joda.time.Instant; - -/** - * {@link Trigger}s that fire based on properties of the elements in the current pane. - */ -@Experimental(Experimental.Kind.TRIGGER) -public class AfterPane extends OnceTrigger { - -private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> - ELEMENTS_IN_PANE_TAG = - StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( - "count", VarLongCoder.of(), new Sum.SumLongFn())); - - private final int countElems; - - private AfterPane(int countElems) { - super(null); - this.countElems = countElems; - } - - /** - * Creates a trigger that fires when the pane contains at least {@code countElems} elements. - */ - public static AfterPane elementCountAtLeast(int countElems) { - return new AfterPane(countElems); - } - - @Override - public void onElement(OnElementContext c) throws Exception { - c.state().access(ELEMENTS_IN_PANE_TAG).add(1L); - } - - @Override - public void prefetchOnMerge(MergingStateAccessor<?, ?> state) { - super.prefetchOnMerge(state); - StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG); - } - - @Override - public void onMerge(OnMergeContext context) throws Exception { - // If we've already received enough elements and finished in some window, - // then this trigger is just finished. - if (context.trigger().finishedInAnyMergingWindow()) { - context.trigger().setFinished(true); - StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG); - return; - } - - // Otherwise, compute the sum of elements in all the active panes. - StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG); - } - - @Override - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = - "prefetch side effect") - public void prefetchShouldFire(StateAccessor<?> state) { - state.access(ELEMENTS_IN_PANE_TAG).readLater(); - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - long count = context.state().access(ELEMENTS_IN_PANE_TAG).read(); - return count >= countElems; - } - - @Override - public void clear(TriggerContext c) throws Exception { - c.state().access(ELEMENTS_IN_PANE_TAG).clear(); - } - - @Override - public boolean isCompatible(Trigger other) { - return this.equals(other); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - @Override - public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return AfterPane.elementCountAtLeast(1); - } - - @Override - public String toString() { - return "AfterPane.elementCountAtLeast(" + countElems + ")"; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof AfterPane)) { - return false; - } - AfterPane that = (AfterPane) obj; - return this.countElems == that.countElems; - } - - @Override - public int hashCode() { - return Objects.hash(countElems); - } - - @Override - protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { - clear(context); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java deleted file mode 100644 index f551118..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import java.util.List; -import java.util.Objects; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.TimeDomain; -import org.joda.time.Instant; - -/** - * {@code AfterProcessingTime} triggers fire based on the current processing time. They operate in - * the real-time domain. - * - * <p>The time at which to fire the timer can be adjusted via the methods in - * {@link AfterDelayFromFirstElement}, such as {@link AfterDelayFromFirstElement#plusDelayOf} or - * {@link AfterDelayFromFirstElement#alignedTo}. - */ -@Experimental(Experimental.Kind.TRIGGER) -public class AfterProcessingTime extends AfterDelayFromFirstElement { - - @Override - @Nullable - public Instant getCurrentTime(Trigger.TriggerContext context) { - return context.currentProcessingTime(); - } - - private AfterProcessingTime(List<SerializableFunction<Instant, Instant>> transforms) { - super(TimeDomain.PROCESSING_TIME, transforms); - } - - /** - * Creates a trigger that fires when the current processing time passes the processing time - * at which this trigger saw the first element in a pane. - */ - public static AfterProcessingTime pastFirstElementInPane() { - return new AfterProcessingTime(IDENTITY); - } - - @Override - protected AfterProcessingTime newWith( - List<SerializableFunction<Instant, Instant>> transforms) { - return new AfterProcessingTime(transforms); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - @Override - protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return new AfterSynchronizedProcessingTime(); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()"); - for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) { - builder - .append(".plusDelayOf(") - .append(delayFn) - .append(")"); - } - - return builder.toString(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof AfterProcessingTime)) { - return false; - } - AfterProcessingTime that = (AfterProcessingTime) obj; - return Objects.equals(this.timestampMappers, that.timestampMappers); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), this.timestampMappers); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java deleted file mode 100644 index 59ece10..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import com.google.common.base.Objects; -import java.util.Collections; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.TimeDomain; -import org.joda.time.Instant; - -class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement { - - @Override - @Nullable - public Instant getCurrentTime(Trigger.TriggerContext context) { - return context.currentSynchronizedProcessingTime(); - } - - public AfterSynchronizedProcessingTime() { - super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - Collections.<SerializableFunction<Instant, Instant>>emptyList()); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - @Override - protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return this; - } - - @Override - public String toString() { - return "AfterSynchronizedProcessingTime.pastFirstElementInPane()"; - } - - @Override - public boolean equals(Object obj) { - return this == obj || obj instanceof AfterSynchronizedProcessingTime; - } - - @Override - public int hashCode() { - return Objects.hashCode(AfterSynchronizedProcessingTime.class); - } - - @Override - protected AfterSynchronizedProcessingTime - newWith(List<SerializableFunction<Instant, Instant>> transforms) { - // ignore transforms - return this; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java deleted file mode 100644 index e2463d8..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.ImmutableList; -import java.util.List; -import java.util.Objects; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.ExecutableTrigger; -import org.apache.beam.sdk.util.TimeDomain; -import org.joda.time.Instant; - -/** - * {@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a - * lower-bound, sometimes heuristically established, on event times that have been fully processed - * by the pipeline. - * - * <p>For sources that provide non-heuristic watermarks (e.g. - * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the - * watermark is a strict guarantee that no data with an event time earlier than - * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any - * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end - * of the window will be the last pane ever for that window. - * - * <p>For sources that provide heuristic watermarks (e.g. - * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the - * watermark itself becomes an <i>estimate</i> that no data with an event time earlier than that - * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can - * often be quite accurate, but the chance of seeing late data for any given window is non-zero. - * Thus, if absolute correctness over time is important to your use case, you may want to consider - * using a trigger that accounts for late data. The default trigger, - * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires - * once when the watermark passes the end of the window and then immediately therafter when any - * late data arrives, is one such example. - * - * <p>The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}. - * - * <p>Additionaly firings before or after the watermark can be requested by calling - * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or - * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}. - */ -@Experimental(Experimental.Kind.TRIGGER) -public class AfterWatermark { - - private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()"; - - // Static factory class. - private AfterWatermark() {} - - /** - * Creates a trigger that fires when the watermark passes the end of the window. - */ - public static FromEndOfWindow pastEndOfWindow() { - return new FromEndOfWindow(); - } - - /** - * @see AfterWatermark - */ - public static class AfterWatermarkEarlyAndLate extends Trigger { - - private static final int EARLY_INDEX = 0; - private static final int LATE_INDEX = 1; - - private final OnceTrigger earlyTrigger; - @Nullable - private final OnceTrigger lateTrigger; - - @SuppressWarnings("unchecked") - public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) { - super(lateTrigger == null - ? ImmutableList.<Trigger>of(earlyTrigger) - : ImmutableList.<Trigger>of(earlyTrigger, lateTrigger)); - this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null"); - this.lateTrigger = lateTrigger; - } - - public Trigger withEarlyFirings(OnceTrigger earlyTrigger) { - return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); - } - - public Trigger withLateFirings(OnceTrigger lateTrigger) { - return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); - } - - @Override - public void onElement(OnElementContext c) throws Exception { - if (!c.trigger().isMerging()) { - // If merges can never happen, we just run the unfinished subtrigger - c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c); - } else { - // If merges can happen, we run for all subtriggers because they might be - // de-activated or re-activated - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - subTrigger.invokeOnElement(c); - } - } - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - // NOTE that the ReduceFnRunner will delete all end-of-window timers for the - // merged-away windows. - - ExecutableTrigger earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX); - // We check the early trigger to determine if we are still processing it or - // if the end of window has transitioned us to the late trigger - OnMergeContext earlyContext = c.forTrigger(earlySubtrigger); - - // If the early trigger is still active in any merging window then it is still active in - // the new merged window, because even if the merged window is "done" some pending elements - // haven't had a chance to fire. - if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) { - earlyContext.trigger().setFinished(false); - if (lateTrigger != null) { - ExecutableTrigger lateSubtrigger = c.trigger().subTrigger(LATE_INDEX); - OnMergeContext lateContext = c.forTrigger(lateSubtrigger); - lateContext.trigger().setFinished(false); - lateSubtrigger.invokeClear(lateContext); - } - } else { - // Otherwise the early trigger and end-of-window bit is done for good. - earlyContext.trigger().setFinished(true); - if (lateTrigger != null) { - c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c); - } - } - } - - @Override - public Trigger getContinuationTrigger() { - return new AfterWatermarkEarlyAndLate( - earlyTrigger.getContinuationTrigger(), - lateTrigger == null ? null : lateTrigger.getContinuationTrigger()); - } - - @Override - protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - throw new UnsupportedOperationException( - "Should not call getContinuationTrigger(List<Trigger>)"); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - // Even without an early or late trigger, we'll still produce a firing at the watermark. - return window.maxTimestamp(); - } - - private boolean endOfWindowReached(Trigger.TriggerContext context) { - return context.currentEventTime() != null - && context.currentEventTime().isAfter(context.window().maxTimestamp()); - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - if (!context.trigger().isFinished(EARLY_INDEX)) { - // We have not yet transitioned to late firings. - // We should fire if either the trigger is ready or we reach the end of the window. - return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context) - || endOfWindowReached(context); - } else if (lateTrigger == null) { - return false; - } else { - // We are running the late trigger - return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context); - } - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { - if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) { - onNonLateFiring(context); - } else if (lateTrigger != null) { - onLateFiring(context); - } else { - // all done - context.trigger().setFinished(true); - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(TO_STRING); - - if (!(earlyTrigger instanceof Never.NeverTrigger)) { - builder - .append(".withEarlyFirings(") - .append(earlyTrigger) - .append(")"); - } - - if (lateTrigger != null && !(lateTrigger instanceof Never.NeverTrigger)) { - builder - .append(".withLateFirings(") - .append(lateTrigger) - .append(")"); - } - - return builder.toString(); - } - - private void onNonLateFiring(Trigger.TriggerContext context) throws Exception { - // We have not yet transitioned to late firings. - ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX); - Trigger.TriggerContext earlyContext = context.forTrigger(earlySubtrigger); - - if (!endOfWindowReached(context)) { - // This is an early firing, since we have not arrived at the end of the window - // Implicitly repeats - earlySubtrigger.invokeOnFire(context); - earlySubtrigger.invokeClear(context); - earlyContext.trigger().setFinished(false); - } else { - // We have arrived at the end of the window; terminate the early trigger - // and clear out the late trigger's state - if (earlySubtrigger.invokeShouldFire(context)) { - earlySubtrigger.invokeOnFire(context); - } - earlyContext.trigger().setFinished(true); - earlySubtrigger.invokeClear(context); - - if (lateTrigger == null) { - // Done if there is no late trigger. - context.trigger().setFinished(true); - } else { - // If there is a late trigger, we transition to it, and need to clear its state - // because it was run in parallel. - context.trigger().subTrigger(LATE_INDEX).invokeClear(context); - } - } - - } - - private void onLateFiring(Trigger.TriggerContext context) throws Exception { - // We are firing the late trigger, with implicit repeat - ExecutableTrigger lateSubtrigger = context.trigger().subTrigger(LATE_INDEX); - lateSubtrigger.invokeOnFire(context); - // It is a OnceTrigger, so it must have finished; unfinished it and clear it - lateSubtrigger.invokeClear(context); - context.forTrigger(lateSubtrigger).trigger().setFinished(false); - } - } - - /** - * A watermark trigger targeted relative to the end of the window. - */ - public static class FromEndOfWindow extends OnceTrigger { - - private FromEndOfWindow() { - super(null); - } - - /** - * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever - * the given {@code Trigger} fires before the watermark has passed the end of the window. - */ - public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyFirings) { - checkNotNull(earlyFirings, "Must specify the trigger to use for early firings"); - return new AfterWatermarkEarlyAndLate(earlyFirings, null); - } - - /** - * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever - * the given {@code Trigger} fires after the watermark has passed the end of the window. - */ - public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateFirings) { - checkNotNull(lateFirings, "Must specify the trigger to use for late firings"); - return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings); - } - - @Override - public void onElement(OnElementContext c) throws Exception { - // We're interested in knowing when the input watermark passes the end of the window. - // (It is possible this has already happened, in which case the timer will be fired - // almost immediately). - c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - // NOTE that the ReduceFnRunner will delete all end-of-window timers for the - // merged-away windows. - - if (!c.trigger().finishedInAllMergingWindows()) { - // If the trigger is still active in any merging window then it is still active in the new - // merged window, because even if the merged window is "done" some pending elements haven't - // had a chance to fire - c.trigger().setFinished(false); - } else if (!endOfWindowReached(c)) { - // If the end of the new window has not been reached, then the trigger is active again. - c.trigger().setFinished(false); - } else { - // Otherwise it is done for good - c.trigger().setFinished(true); - } - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return window.maxTimestamp(); - } - - @Override - public FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTriggers) { - return this; - } - - @Override - public String toString() { - return TO_STRING; - } - - @Override - public boolean equals(Object obj) { - return obj instanceof FromEndOfWindow; - } - - @Override - public int hashCode() { - return Objects.hash(getClass()); - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return endOfWindowReached(context); - } - - private boolean endOfWindowReached(Trigger.TriggerContext context) { - return context.currentEventTime() != null - && context.currentEventTime().isAfter(context.window().maxTimestamp()); - } - - @Override - protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java deleted file mode 100644 index d6b72ef..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import java.util.List; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.util.TimeDomain; -import org.joda.time.Instant; - -/** - * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. - * See {@link Repeatedly#forever} and {@link AfterWatermark#pastEndOfWindow} for more details. - */ -@Experimental(Experimental.Kind.TRIGGER) -public class DefaultTrigger extends Trigger{ - - private DefaultTrigger() { - super(null); - } - - /** - * Returns the default trigger. - */ - public static DefaultTrigger of() { - return new DefaultTrigger(); - } - - @Override - public void onElement(OnElementContext c) throws Exception { - // If the end of the window has already been reached, then we are already ready to fire - // and do not need to set a wake-up timer. - if (!endOfWindowReached(c)) { - c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); - } - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - // If the end of the window has already been reached, then we are already ready to fire - // and do not need to set a wake-up timer. - if (!endOfWindowReached(c)) { - c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); - } - } - - @Override - public void clear(TriggerContext c) throws Exception { } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return window.maxTimestamp(); - } - - @Override - public boolean isCompatible(Trigger other) { - // Semantically, all default triggers are identical - return other instanceof DefaultTrigger; - } - - @Override - public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return this; - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return endOfWindowReached(context); - } - - private boolean endOfWindowReached(Trigger.TriggerContext context) { - return context.currentEventTime() != null - && context.currentEventTime().isAfter(context.window().maxTimestamp()); - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java deleted file mode 100644 index 5f20465..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import java.util.List; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.joda.time.Instant; - -/** - * A trigger which never fires. - * - * <p>Using this trigger will only produce output when the watermark passes the end of the - * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed - * lateness}. - */ -public final class Never { - /** - * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey} - * when the {@link BoundedWindow} closes. - */ - public static OnceTrigger ever() { - // NeverTrigger ignores all inputs and is Window-type independent. - return new NeverTrigger(); - } - - // package-private in order to check identity for string formatting. - static class NeverTrigger extends OnceTrigger { - protected NeverTrigger() { - super(null); - } - - @Override - public void onElement(OnElementContext c) {} - - @Override - public void onMerge(OnMergeContext c) {} - - @Override - protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return this; - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) { - return false; - } - - @Override - protected void onOnlyFiring(Trigger.TriggerContext context) { - throw new UnsupportedOperationException( - String.format("%s should never fire", getClass().getSimpleName())); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java deleted file mode 100644 index 25b7b34..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import com.google.common.annotations.VisibleForTesting; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.util.ExecutableTrigger; -import org.joda.time.Instant; - -/** - * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires. - */ -class OrFinallyTrigger extends Trigger { - - private static final int ACTUAL = 0; - private static final int UNTIL = 1; - - @VisibleForTesting OrFinallyTrigger(Trigger actual, Trigger.OnceTrigger until) { - super(Arrays.asList(actual, until)); - } - - @Override - public void onElement(OnElementContext c) throws Exception { - c.trigger().subTrigger(ACTUAL).invokeOnElement(c); - c.trigger().subTrigger(UNTIL).invokeOnElement(c); - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - subTrigger.invokeOnMerge(c); - } - updateFinishedState(c); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - // This trigger fires once either the trigger or the until trigger fires. - Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window); - Instant untilDeadline = subTriggers.get(UNTIL).getWatermarkThatGuaranteesFiring(window); - return actualDeadline.isBefore(untilDeadline) ? actualDeadline : untilDeadline; - } - - @Override - public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL - // may not be a OnceTrigger. - return Repeatedly.forever( - new OrFinallyTrigger( - continuationTriggers.get(ACTUAL), - (Trigger.OnceTrigger) continuationTriggers.get(UNTIL))); - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context) - || context.trigger().subTrigger(UNTIL).invokeShouldFire(context); - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { - ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL); - ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL); - - if (untilSubtrigger.invokeShouldFire(context)) { - untilSubtrigger.invokeOnFire(context); - actualSubtrigger.invokeClear(context); - } else { - // If until didn't fire, then the actual must have (or it is forbidden to call - // onFire) so we are done only if actual is done. - actualSubtrigger.invokeOnFire(context); - // Do not clear the until trigger, because it tracks data cross firings. - } - updateFinishedState(context); - } - - @Override - public String toString() { - return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL)); - } - - private void updateFinishedState(TriggerContext c) throws Exception { - boolean anyStillFinished = false; - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished(); - } - c.trigger().setFinished(anyStillFinished); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java deleted file mode 100644 index 8858798..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.util.ExecutableTrigger; -import org.joda.time.Instant; - -/** - * Repeat a trigger, either until some condition is met or forever. - * - * <p>For example, to fire after the end of the window, and every time late data arrives: - * <pre> {@code - * Repeatedly.forever(AfterWatermark.isPastEndOfWindow()); - * } </pre> - * - * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite - * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}. - */ -public class Repeatedly extends Trigger { - - private static final int REPEATED = 0; - - /** - * Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each - * time it fires and ignoring any indications to finish. - * - * <p>Unless used with {@link Trigger#orFinally} the composite trigger will never finish. - * - * @param repeated the trigger to execute repeatedly. - */ - public static Repeatedly forever(Trigger repeated) { - return new Repeatedly(repeated); - } - - private Repeatedly(Trigger repeated) { - super(Arrays.asList(repeated)); - } - - - @Override - public void onElement(OnElementContext c) throws Exception { - getRepeated(c).invokeOnElement(c); - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - getRepeated(c).invokeOnMerge(c); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - // This trigger fires once the repeated trigger fires. - return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window); - } - - @Override - public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return new Repeatedly(continuationTriggers.get(REPEATED)); - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return getRepeated(context).invokeShouldFire(context); - } - - @Override - public void onFire(TriggerContext context) throws Exception { - getRepeated(context).invokeOnFire(context); - - if (context.trigger().isFinished(REPEATED)) { - // Reset tree will recursively clear the finished bits, and invoke clear. - context.forTrigger(getRepeated(context)).trigger().resetTree(); - } - } - - @Override - public String toString() { - return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED)); - } - - private ExecutableTrigger getRepeated(TriggerContext context) { - return context.trigger().subTrigger(REPEATED); - } -}