http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java deleted file mode 100644 index 8858798..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.util.ExecutableTrigger; -import org.joda.time.Instant; - -/** - * Repeat a trigger, either until some condition is met or forever. - * - * <p>For example, to fire after the end of the window, and every time late data arrives: - * <pre> {@code - * Repeatedly.forever(AfterWatermark.isPastEndOfWindow()); - * } </pre> - * - * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite - * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}. - */ -public class Repeatedly extends Trigger { - - private static final int REPEATED = 0; - - /** - * Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each - * time it fires and ignoring any indications to finish. - * - * <p>Unless used with {@link Trigger#orFinally} the composite trigger will never finish. - * - * @param repeated the trigger to execute repeatedly. - */ - public static Repeatedly forever(Trigger repeated) { - return new Repeatedly(repeated); - } - - private Repeatedly(Trigger repeated) { - super(Arrays.asList(repeated)); - } - - - @Override - public void onElement(OnElementContext c) throws Exception { - getRepeated(c).invokeOnElement(c); - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - getRepeated(c).invokeOnMerge(c); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - // This trigger fires once the repeated trigger fires. - return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window); - } - - @Override - public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return new Repeatedly(continuationTriggers.get(REPEATED)); - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return getRepeated(context).invokeShouldFire(context); - } - - @Override - public void onFire(TriggerContext context) throws Exception { - getRepeated(context).invokeOnFire(context); - - if (context.trigger().isFinished(REPEATED)) { - // Reset tree will recursively clear the finished bits, and invoke clear. - context.forTrigger(getRepeated(context)).trigger().resetTree(); - } - } - - @Override - public String toString() { - return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED)); - } - - private ExecutableTrigger getRepeated(TriggerContext context) { - return context.trigger().subTrigger(REPEATED); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java deleted file mode 100644 index 9e2c27d..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.util.List; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.joda.time.Instant; - -/** - * The trigger used with {@link Reshuffle} which triggers on every element - * and never buffers state. - * - * @param <W> The kind of window that is being reshuffled. - */ -public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger { - - public ReshuffleTrigger() { - super(null); - } - - @Override - public void onElement(Trigger.OnElementContext c) { } - - @Override - public void onMerge(Trigger.OnMergeContext c) { } - - @Override - protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return this; - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - throw new UnsupportedOperationException( - "ReshuffleTrigger should not be used outside of Reshuffle"); - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return true; - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } - - @Override - public String toString() { - return "ReshuffleTrigger()"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java deleted file mode 100644 index a960aa4..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java +++ /dev/null @@ -1,527 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import com.google.common.base.Joiner; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.util.ExecutableTrigger; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.joda.time.Instant; - -/** - * {@code Trigger}s control when the elements for a specific key and window are output. As elements - * arrive, they are put into one or more windows by a {@link Window} transform and its associated - * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the - * {@code Window}s contents should be output. - * - * <p>See {@link org.apache.beam.sdk.transforms.GroupByKey} and {@link Window} - * for more information about how grouping with windows works. - * - * <p>The elements that are assigned to a window since the last time it was fired (or since the - * window was created) are placed into the current window pane. Triggers are evaluated against the - * elements as they are added. When the root trigger fires, the elements in the current pane will be - * output. When the root trigger finishes (indicating it will never fire again), the window is - * closed and any new elements assigned to that window are discarded. - * - * <p>Several predefined {@code Trigger}s are provided: - * <ul> - * <li> {@link AfterWatermark} for firing when the watermark passes a timestamp determined from - * either the end of the window or the arrival of the first element in a pane. - * <li> {@link AfterProcessingTime} for firing after some amount of processing time has elapsed - * (typically since the first element in a pane). - * <li> {@link AfterPane} for firing off a property of the elements in the current pane, such as - * the number of elements that have been assigned to the current pane. - * </ul> - * - * <p>In addition, {@code Trigger}s can be combined in a variety of ways: - * <ul> - * <li> {@link Repeatedly#forever} to create a trigger that executes forever. Any time its - * argument finishes it gets reset and starts over. Can be combined with - * {@link Trigger#orFinally} to specify a condition that causes the repetition to stop. - * <li> {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every) - * time that a trigger fires, and advancing to the next trigger in the sequence when it finishes. - * <li> {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments - * fires. An {@link AfterFirst} trigger finishes after it fires once. - * <li> {@link AfterAll#of} to create a trigger that fires after all least one of its arguments - * have fired at least once. An {@link AfterAll} trigger finishes after it fires once. - * </ul> - * - * <p>Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one - * of the following states: - * <ul> - * <li> Never Existed - before the trigger has started executing, there is no state associated - * with it anywhere in the system. A trigger moves to the executing state as soon as it - * processes in the current pane. - * <li> Executing - while the trigger is receiving items and may fire. While it is in this state, - * it may persist book-keeping information to persisted state, set timers, etc. - * <li> Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the - * system remembers only that it is finished. Entering this state causes us to discard any - * elements in the buffer for that window, as well. - * </ul> - * - * <p>Once finished, a trigger cannot return itself back to an earlier state, however a composite - * trigger could reset its sub-triggers. - * - * <p>Triggers should not build up any state internally since they may be recreated - * between invocations of the callbacks. All important values should be persisted using - * state before the callback returns. - */ -@Experimental(Experimental.Kind.TRIGGER) -public abstract class Trigger implements Serializable { - - /** - * Interface for accessing information about the trigger being executed and other triggers in the - * same tree. - */ - public interface TriggerInfo { - - /** - * Returns true if the windowing strategy of the current {@code PCollection} is a merging - * WindowFn. If true, the trigger execution needs to keep enough information to support the - * possibility of {@link Trigger#onMerge} being called. If false, {@link Trigger#onMerge} will - * never be called. - */ - boolean isMerging(); - - /** - * Access the executable versions of the sub-triggers of the current trigger. - */ - Iterable<ExecutableTrigger> subTriggers(); - - /** - * Access the executable version of the specified sub-trigger. - */ - ExecutableTrigger subTrigger(int subtriggerIndex); - - /** - * Returns true if the current trigger is marked finished. - */ - boolean isFinished(); - - /** - * Return true if the given subtrigger is marked finished. - */ - boolean isFinished(int subtriggerIndex); - - /** - * Returns true if all the sub-triggers of the current trigger are marked finished. - */ - boolean areAllSubtriggersFinished(); - - /** - * Returns an iterable over the unfinished sub-triggers of the current trigger. - */ - Iterable<ExecutableTrigger> unfinishedSubTriggers(); - - /** - * Returns the first unfinished sub-trigger. - */ - ExecutableTrigger firstUnfinishedSubTrigger(); - - /** - * Clears all keyed state for triggers in the current sub-tree and unsets all the associated - * finished bits. - */ - void resetTree() throws Exception; - - /** - * Sets the finished bit for the current trigger. - */ - void setFinished(boolean finished); - - /** - * Sets the finished bit for the given sub-trigger. - */ - void setFinished(boolean finished, int subTriggerIndex); - } - - /** - * Interact with properties of the trigger being executed, with extensions to deal with the - * merging windows. - */ - public interface MergingTriggerInfo extends TriggerInfo { - - /** Return true if the trigger is finished in any window being merged. */ - public abstract boolean finishedInAnyMergingWindow(); - - /** Return true if the trigger is finished in all windows being merged. */ - public abstract boolean finishedInAllMergingWindows(); - } - - /** - * Information accessible to all operational hooks in this {@code Trigger}. - * - * <p>Used directly in {@link Trigger#shouldFire} and {@link Trigger#clear}, and - * extended with additional information in other methods. - */ - public abstract class TriggerContext { - - /** Returns the interface for accessing trigger info. */ - public abstract TriggerInfo trigger(); - - /** Returns the interface for accessing persistent state. */ - public abstract StateAccessor<?> state(); - - /** The window that the current context is executing in. */ - public abstract BoundedWindow window(); - - /** Create a sub-context for the given sub-trigger. */ - public abstract TriggerContext forTrigger(ExecutableTrigger trigger); - - /** - * Removes the timer set in this trigger context for the given {@link Instant} - * and {@link TimeDomain}. - */ - public abstract void deleteTimer(Instant timestamp, TimeDomain domain); - - /** The current processing time. */ - public abstract Instant currentProcessingTime(); - - /** The current synchronized upstream processing time or {@code null} if unknown. */ - @Nullable - public abstract Instant currentSynchronizedProcessingTime(); - - /** The current event time for the input or {@code null} if unknown. */ - @Nullable - public abstract Instant currentEventTime(); - } - - /** - * Extended {@link TriggerContext} containing information accessible to the {@link #onElement} - * operational hook. - */ - public abstract class OnElementContext extends TriggerContext { - /** The event timestamp of the element currently being processed. */ - public abstract Instant eventTimestamp(); - - /** - * Sets a timer to fire when the watermark or processing time is beyond the given timestamp. - * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards. - * - * <p>As with {@link #state}, timers are implicitly scoped to the current window. All - * timer firings for a window will be received, but the implementation should choose to ignore - * those that are not applicable. - * - * @param timestamp the time at which the trigger should be re-evaluated - * @param domain the domain that the {@code timestamp} applies to - */ - public abstract void setTimer(Instant timestamp, TimeDomain domain); - - /** Create an {@code OnElementContext} for executing the given trigger. */ - @Override - public abstract OnElementContext forTrigger(ExecutableTrigger trigger); - } - - /** - * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge} - * operational hook. - */ - public abstract class OnMergeContext extends TriggerContext { - /** - * Sets a timer to fire when the watermark or processing time is beyond the given timestamp. - * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards. - * - * <p>As with {@link #state}, timers are implicitly scoped to the current window. All - * timer firings for a window will be received, but the implementation should choose to ignore - * those that are not applicable. - * - * @param timestamp the time at which the trigger should be re-evaluated - * @param domain the domain that the {@code timestamp} applies to - */ - public abstract void setTimer(Instant timestamp, TimeDomain domain); - - /** Create an {@code OnMergeContext} for executing the given trigger. */ - @Override - public abstract OnMergeContext forTrigger(ExecutableTrigger trigger); - - @Override - public abstract MergingStateAccessor<?, ?> state(); - - @Override - public abstract MergingTriggerInfo trigger(); - } - - @Nullable - protected final List<Trigger> subTriggers; - - protected Trigger(@Nullable List<Trigger> subTriggers) { - this.subTriggers = subTriggers; - } - - - /** - * Called every time an element is incorporated into a window. - */ - public abstract void onElement(OnElementContext c) throws Exception; - - /** - * Called immediately after windows have been merged. - * - * <p>Leaf triggers should update their state by inspecting their status and any state - * in the merging windows. Composite triggers should update their state by calling - * {@link ExecutableTrigger#invokeOnMerge} on their sub-triggers, and applying appropriate logic. - * - * <p>A trigger such as {@link AfterWatermark#pastEndOfWindow} may no longer be finished; - * it is the responsibility of the trigger itself to record this fact. It is forbidden for - * a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending - * elements that led to it being ready to fire. - * - * <p>The implementation does not need to clear out any state associated with the old windows. - */ - public abstract void onMerge(OnMergeContext c) throws Exception; - - /** - * Returns {@code true} if the current state of the trigger indicates that its condition - * is satisfied and it is ready to fire. - */ - public abstract boolean shouldFire(TriggerContext context) throws Exception; - - /** - * Adjusts the state of the trigger to be ready for the next pane. For example, a - * {@link Repeatedly} trigger will reset its inner trigger, since it has fired. - * - * <p>If the trigger is finished, it is the responsibility of the trigger itself to - * record that fact via the {@code context}. - */ - public abstract void onFire(TriggerContext context) throws Exception; - - /** - * Called to allow the trigger to prefetch any state it will likely need to read from during - * an {@link #onElement} call. - */ - public void prefetchOnElement(StateAccessor<?> state) { - if (subTriggers != null) { - for (Trigger trigger : subTriggers) { - trigger.prefetchOnElement(state); - } - } - } - - /** - * Called to allow the trigger to prefetch any state it will likely need to read from during - * an {@link #onMerge} call. - */ - public void prefetchOnMerge(MergingStateAccessor<?, ?> state) { - if (subTriggers != null) { - for (Trigger trigger : subTriggers) { - trigger.prefetchOnMerge(state); - } - } - } - - /** - * Called to allow the trigger to prefetch any state it will likely need to read from during - * an {@link #shouldFire} call. - */ - public void prefetchShouldFire(StateAccessor<?> state) { - if (subTriggers != null) { - for (Trigger trigger : subTriggers) { - trigger.prefetchShouldFire(state); - } - } - } - - /** - * Called to allow the trigger to prefetch any state it will likely need to read from during - * an {@link #onFire} call. - */ - public void prefetchOnFire(StateAccessor<?> state) { - if (subTriggers != null) { - for (Trigger trigger : subTriggers) { - trigger.prefetchOnFire(state); - } - } - } - - /** - * Clear any state associated with this trigger in the given window. - * - * <p>This is called after a trigger has indicated it will never fire again. The trigger system - * keeps enough information to know that the trigger is finished, so this trigger should clear all - * of its state. - */ - public void clear(TriggerContext c) throws Exception { - if (subTriggers != null) { - for (ExecutableTrigger trigger : c.trigger().subTriggers()) { - trigger.invokeClear(c); - } - } - } - - public Iterable<Trigger> subTriggers() { - return subTriggers; - } - - /** - * Return a trigger to use after a {@code GroupByKey} to preserve the - * intention of this trigger. Specifically, triggers that are time based - * and intended to provide speculative results should continue providing - * speculative results. Triggers that fire once (or multiple times) should - * continue firing once (or multiple times). - */ - public Trigger getContinuationTrigger() { - if (subTriggers == null) { - return getContinuationTrigger(null); - } - - List<Trigger> subTriggerContinuations = new ArrayList<>(); - for (Trigger subTrigger : subTriggers) { - subTriggerContinuations.add(subTrigger.getContinuationTrigger()); - } - return getContinuationTrigger(subTriggerContinuations); - } - - /** - * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For convenience, this - * is provided the continuation trigger of each of the sub-triggers. - */ - protected abstract Trigger getContinuationTrigger(List<Trigger> continuationTriggers); - - /** - * Returns a bound in watermark time by which this trigger would have fired at least once - * for a given window had there been input data. This is a static property of a trigger - * that does not depend on its state. - * - * <p>For triggers that do not fire based on the watermark advancing, returns - * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. - * - * <p>This estimate is used to determine that there are no elements in a side-input window, which - * causes the default value to be used instead. - */ - public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window); - - /** - * Returns whether this performs the same triggering as the given {@code Trigger}. - */ - public boolean isCompatible(Trigger other) { - if (!getClass().equals(other.getClass())) { - return false; - } - - if (subTriggers == null) { - return other.subTriggers == null; - } else if (other.subTriggers == null) { - return false; - } else if (subTriggers.size() != other.subTriggers.size()) { - return false; - } - - for (int i = 0; i < subTriggers.size(); i++) { - if (!subTriggers.get(i).isCompatible(other.subTriggers.get(i))) { - return false; - } - } - - return true; - } - - @Override - public String toString() { - String simpleName = getClass().getSimpleName(); - if (getClass().getEnclosingClass() != null) { - simpleName = getClass().getEnclosingClass().getSimpleName() + "." + simpleName; - } - if (subTriggers == null || subTriggers.size() == 0) { - return simpleName; - } else { - return simpleName + "(" + Joiner.on(", ").join(subTriggers) + ")"; - } - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof Trigger)) { - return false; - } - Trigger that = (Trigger) obj; - return Objects.equals(getClass(), that.getClass()) - && Objects.equals(subTriggers, that.subTriggers); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), subTriggers); - } - - /** - * Specify an ending condition for this trigger. If the {@code until} fires then the combination - * fires. - * - * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes - * as soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time - * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that - * {@code t1} may have fired since {@code t2} started, so not all of the elements that {@code t2} - * has seen are necessarily in the current pane. - * - * <p>For example the final firing of the following trigger may only have 1 element: - * <pre> {@code - * Repeatedly.forever(AfterPane.elementCountAtLeast(2)) - * .orFinally(AfterPane.elementCountAtLeast(5)) - * } </pre> - * - * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same - * as {@code AfterFirst.of(t1, t2)}. - */ - public Trigger orFinally(OnceTrigger until) { - return new OrFinallyTrigger(this, until); - } - - /** - * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather - * than the general {@link Trigger} class to indicate that behavior. - */ - public abstract static class OnceTrigger extends Trigger { - protected OnceTrigger(List<Trigger> subTriggers) { - super(subTriggers); - } - - @Override - public final OnceTrigger getContinuationTrigger() { - Trigger continuation = super.getContinuationTrigger(); - if (!(continuation instanceof OnceTrigger)) { - throw new IllegalStateException("Continuation of a OnceTrigger must be a OnceTrigger"); - } - return (OnceTrigger) continuation; - } - - /** - * {@inheritDoc} - */ - @Override - public final void onFire(TriggerContext context) throws Exception { - onOnlyFiring(context); - context.trigger().setFinished(true); - } - - /** - * Called exactly once by {@link #onFire} when the trigger is fired. By default, - * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}. - */ - protected abstract void onOnlyFiring(TriggerContext context) throws Exception; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java deleted file mode 100644 index e09aac2..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java +++ /dev/null @@ -1,507 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import java.util.Collection; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Trigger.MergingTriggerInfo; -import org.apache.beam.sdk.transforms.windowing.Trigger.TriggerInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateTag; -import org.joda.time.Instant; - -/** - * Factory for creating instances of the various {@link Trigger} contexts. - * - * <p>These contexts are highly interdependent and share many fields; it is inadvisable - * to create them via any means other than this factory class. - */ -public class TriggerContextFactory<W extends BoundedWindow> { - - private final WindowFn<?, W> windowFn; - private StateInternals<?> stateInternals; - private final Coder<W> windowCoder; - - public TriggerContextFactory(WindowFn<?, W> windowFn, - StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) { - // Future triggers may be able to exploit the active window to state address window mapping. - this.windowFn = windowFn; - this.stateInternals = stateInternals; - this.windowCoder = windowFn.windowCoder(); - } - - public Trigger.TriggerContext base(W window, Timers timers, - ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { - return new TriggerContextImpl(window, timers, rootTrigger, finishedSet); - } - - public Trigger.OnElementContext createOnElementContext( - W window, Timers timers, Instant elementTimestamp, - ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { - return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp); - } - - public Trigger.OnMergeContext createOnMergeContext(W window, Timers timers, - ExecutableTrigger rootTrigger, FinishedTriggers finishedSet, - Map<W, FinishedTriggers> finishedSets) { - return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets); - } - - public StateAccessor<?> createStateAccessor(W window, ExecutableTrigger trigger) { - return new StateAccessorImpl(window, trigger); - } - - public MergingStateAccessor<?, W> createMergingStateAccessor( - W mergeResult, Collection<W> mergingWindows, ExecutableTrigger trigger) { - return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult); - } - - private class TriggerInfoImpl implements Trigger.TriggerInfo { - - protected final ExecutableTrigger trigger; - protected final FinishedTriggers finishedSet; - private final Trigger.TriggerContext context; - - public TriggerInfoImpl(ExecutableTrigger trigger, FinishedTriggers finishedSet, - Trigger.TriggerContext context) { - this.trigger = trigger; - this.finishedSet = finishedSet; - this.context = context; - } - - @Override - public boolean isMerging() { - return !windowFn.isNonMerging(); - } - - @Override - public Iterable<ExecutableTrigger> subTriggers() { - return trigger.subTriggers(); - } - - @Override - public ExecutableTrigger subTrigger(int subtriggerIndex) { - return trigger.subTriggers().get(subtriggerIndex); - } - - @Override - public boolean isFinished() { - return finishedSet.isFinished(trigger); - } - - @Override - public boolean isFinished(int subtriggerIndex) { - return finishedSet.isFinished(subTrigger(subtriggerIndex)); - } - - @Override - public boolean areAllSubtriggersFinished() { - return Iterables.isEmpty(unfinishedSubTriggers()); - } - - @Override - public Iterable<ExecutableTrigger> unfinishedSubTriggers() { - return FluentIterable - .from(trigger.subTriggers()) - .filter(new Predicate<ExecutableTrigger>() { - @Override - public boolean apply(ExecutableTrigger trigger) { - return !finishedSet.isFinished(trigger); - } - }); - } - - @Override - public ExecutableTrigger firstUnfinishedSubTrigger() { - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { - if (!finishedSet.isFinished(subTrigger)) { - return subTrigger; - } - } - return null; - } - - @Override - public void resetTree() throws Exception { - finishedSet.clearRecursively(trigger); - trigger.invokeClear(context); - } - - @Override - public void setFinished(boolean finished) { - finishedSet.setFinished(trigger, finished); - } - - @Override - public void setFinished(boolean finished, int subTriggerIndex) { - finishedSet.setFinished(subTrigger(subTriggerIndex), finished); - } - } - - private class TriggerTimers implements Timers { - - private final Timers timers; - private final W window; - - public TriggerTimers(W window, Timers timers) { - this.timers = timers; - this.window = window; - } - - @Override - public void setTimer(Instant timestamp, TimeDomain timeDomain) { - timers.setTimer(timestamp, timeDomain); - } - - @Override - public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { - if (timeDomain == TimeDomain.EVENT_TIME - && timestamp.equals(window.maxTimestamp())) { - // Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time - // state transitions. - return; - } - timers.deleteTimer(timestamp, timeDomain); - } - - @Override - public Instant currentProcessingTime() { - return timers.currentProcessingTime(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return timers.currentSynchronizedProcessingTime(); - } - - @Override - public Instant currentEventTime() { - return timers.currentEventTime(); - } - } - - private class MergingTriggerInfoImpl - extends TriggerInfoImpl implements Trigger.MergingTriggerInfo { - - private final Map<W, FinishedTriggers> finishedSets; - - public MergingTriggerInfoImpl( - ExecutableTrigger trigger, - FinishedTriggers finishedSet, - Trigger.TriggerContext context, - Map<W, FinishedTriggers> finishedSets) { - super(trigger, finishedSet, context); - this.finishedSets = finishedSets; - } - - @Override - public boolean finishedInAnyMergingWindow() { - for (FinishedTriggers finishedSet : finishedSets.values()) { - if (finishedSet.isFinished(trigger)) { - return true; - } - } - return false; - } - - @Override - public boolean finishedInAllMergingWindows() { - for (FinishedTriggers finishedSet : finishedSets.values()) { - if (!finishedSet.isFinished(trigger)) { - return false; - } - } - return true; - } - } - - private class StateAccessorImpl implements StateAccessor<Object> { - protected final int triggerIndex; - protected final StateNamespace windowNamespace; - - public StateAccessorImpl( - W window, - ExecutableTrigger trigger) { - this.triggerIndex = trigger.getTriggerIndex(); - this.windowNamespace = namespaceFor(window); - } - - protected StateNamespace namespaceFor(W window) { - return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex); - } - - @Override - public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) { - return stateInternals.state(windowNamespace, address); - } - } - - private class MergingStateAccessorImpl extends StateAccessorImpl - implements MergingStateAccessor<Object, W> { - private final Collection<W> activeToBeMerged; - - public MergingStateAccessorImpl(ExecutableTrigger trigger, Collection<W> activeToBeMerged, - W mergeResult) { - super(mergeResult, trigger); - this.activeToBeMerged = activeToBeMerged; - } - - @Override - public <StateT extends State> StateT access( - StateTag<? super Object, StateT> address) { - return stateInternals.state(windowNamespace, address); - } - - @Override - public <StateT extends State> Map<W, StateT> accessInEachMergingWindow( - StateTag<? super Object, StateT> address) { - ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder(); - for (W mergingWindow : activeToBeMerged) { - StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address); - builder.put(mergingWindow, stateForWindow); - } - return builder.build(); - } - } - - private class TriggerContextImpl extends Trigger.TriggerContext { - - private final W window; - private final StateAccessorImpl state; - private final Timers timers; - private final TriggerInfoImpl triggerInfo; - - private TriggerContextImpl( - W window, - Timers timers, - ExecutableTrigger trigger, - FinishedTriggers finishedSet) { - trigger.getSpec().super(); - this.window = window; - this.state = new StateAccessorImpl(window, trigger); - this.timers = new TriggerTimers(window, timers); - this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this); - } - - @Override - public Trigger.TriggerContext forTrigger(ExecutableTrigger trigger) { - return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet); - } - - @Override - public TriggerInfo trigger() { - return triggerInfo; - } - - @Override - public StateAccessor<?> state() { - return state; - } - - @Override - public W window() { - return window; - } - - @Override - public void deleteTimer(Instant timestamp, TimeDomain domain) { - timers.deleteTimer(timestamp, domain); - } - - @Override - public Instant currentProcessingTime() { - return timers.currentProcessingTime(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return timers.currentSynchronizedProcessingTime(); - } - - @Override - @Nullable - public Instant currentEventTime() { - return timers.currentEventTime(); - } - } - - private class OnElementContextImpl extends Trigger.OnElementContext { - - private final W window; - private final StateAccessorImpl state; - private final Timers timers; - private final TriggerInfoImpl triggerInfo; - private final Instant eventTimestamp; - - private OnElementContextImpl( - W window, - Timers timers, - ExecutableTrigger trigger, - FinishedTriggers finishedSet, - Instant eventTimestamp) { - trigger.getSpec().super(); - this.window = window; - this.state = new StateAccessorImpl(window, trigger); - this.timers = new TriggerTimers(window, timers); - this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this); - this.eventTimestamp = eventTimestamp; - } - - - @Override - public Instant eventTimestamp() { - return eventTimestamp; - } - - @Override - public Trigger.OnElementContext forTrigger(ExecutableTrigger trigger) { - return new OnElementContextImpl( - window, timers, trigger, triggerInfo.finishedSet, eventTimestamp); - } - - @Override - public TriggerInfo trigger() { - return triggerInfo; - } - - @Override - public StateAccessor<?> state() { - return state; - } - - @Override - public W window() { - return window; - } - - @Override - public void setTimer(Instant timestamp, TimeDomain domain) { - timers.setTimer(timestamp, domain); - } - - - @Override - public void deleteTimer(Instant timestamp, TimeDomain domain) { - timers.deleteTimer(timestamp, domain); - } - - @Override - public Instant currentProcessingTime() { - return timers.currentProcessingTime(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return timers.currentSynchronizedProcessingTime(); - } - - @Override - @Nullable - public Instant currentEventTime() { - return timers.currentEventTime(); - } - } - - private class OnMergeContextImpl extends Trigger.OnMergeContext { - private final MergingStateAccessor<?, W> state; - private final W window; - private final Collection<W> mergingWindows; - private final Timers timers; - private final MergingTriggerInfoImpl triggerInfo; - - private OnMergeContextImpl( - W window, - Timers timers, - ExecutableTrigger trigger, - FinishedTriggers finishedSet, - Map<W, FinishedTriggers> finishedSets) { - trigger.getSpec().super(); - this.mergingWindows = finishedSets.keySet(); - this.window = window; - this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window); - this.timers = new TriggerTimers(window, timers); - this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets); - } - - @Override - public Trigger.OnMergeContext forTrigger(ExecutableTrigger trigger) { - return new OnMergeContextImpl( - window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets); - } - - @Override - public MergingStateAccessor<?, W> state() { - return state; - } - - @Override - public MergingTriggerInfo trigger() { - return triggerInfo; - } - - @Override - public W window() { - return window; - } - - @Override - public void setTimer(Instant timestamp, TimeDomain domain) { - timers.setTimer(timestamp, domain); - } - - @Override - public void deleteTimer(Instant timestamp, TimeDomain domain) { - timers.setTimer(timestamp, domain); - - } - - @Override - public Instant currentProcessingTime() { - return timers.currentProcessingTime(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return timers.currentSynchronizedProcessingTime(); - } - - @Override - @Nullable - public Instant currentEventTime() { - return timers.currentEventTime(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java deleted file mode 100644 index 8d0f322..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.BitSet; -import java.util.Collection; -import java.util.Map; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.util.BitSetCoder; -import org.apache.beam.sdk.util.ExecutableTrigger; -import org.apache.beam.sdk.util.FinishedTriggers; -import org.apache.beam.sdk.util.FinishedTriggersBitSet; -import org.apache.beam.sdk.util.Timers; -import org.apache.beam.sdk.util.TriggerContextFactory; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.ValueState; -import org.joda.time.Instant; - -/** - * Executes a trigger while managing persistence of information about which subtriggers are - * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger. - * - * <p>Specifically, the responsibilities are: - * - * <ul> - * <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by - * constructing the appropriate trigger contexts.</li> - * <li>Committing a record of which subtriggers are finished to persistent state.</li> - * <li>Restoring the record of which subtriggers are finished from persistent state.</li> - * <li>Clearing out the persisted finished set when a caller indicates - * (via {#link #clearFinished}) that it is no longer needed.</li> - * </ul> - * - * <p>These responsibilities are intertwined: trigger contexts include mutable information about - * which subtriggers are finished. This class provides the information when building the contexts - * and commits the information when the method of the {@link ExecutableTrigger} returns. - * - * @param <W> The kind of windows being processed. - */ -public class TriggerRunner<W extends BoundedWindow> { - @VisibleForTesting - static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG = - StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of())); - - private final ExecutableTrigger rootTrigger; - private final TriggerContextFactory<W> contextFactory; - - public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) { - checkState(rootTrigger.getTriggerIndex() == 0); - this.rootTrigger = rootTrigger; - this.contextFactory = contextFactory; - } - - private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) { - if (!isFinishedSetNeeded()) { - // If no trigger in the tree will ever have finished bits, then we don't need to read them. - // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not - // finished) for each trigger in the tree. - return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()); - } - - BitSet bitSet = state.read(); - return bitSet == null - ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()) - : FinishedTriggersBitSet.fromBitSet(bitSet); - } - - - private void clearFinishedBits(ValueState<BitSet> state) { - if (!isFinishedSetNeeded()) { - // Nothing to clear. - return; - } - state.clear(); - } - - /** Return true if the trigger is closed in the window corresponding to the specified state. */ - public boolean isClosed(StateAccessor<?> state) { - return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); - } - - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "prefetch side effect") - public void prefetchForValue(W window, StateAccessor<?> state) { - if (isFinishedSetNeeded()) { - state.access(FINISHED_BITS_TAG).readLater(); - } - rootTrigger.getSpec().prefetchOnElement( - contextFactory.createStateAccessor(window, rootTrigger)); - } - - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "prefetch side effect") - public void prefetchOnFire(W window, StateAccessor<?> state) { - if (isFinishedSetNeeded()) { - state.access(FINISHED_BITS_TAG).readLater(); - } - rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger)); - } - - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "prefetch side effect") - public void prefetchShouldFire(W window, StateAccessor<?> state) { - if (isFinishedSetNeeded()) { - state.access(FINISHED_BITS_TAG).readLater(); - } - rootTrigger.getSpec().prefetchShouldFire( - contextFactory.createStateAccessor(window, rootTrigger)); - } - - /** - * Run the trigger logic to deal with a new value. - */ - public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state) - throws Exception { - // Clone so that we can detect changes and so that changes here don't pollute merging. - FinishedTriggersBitSet finishedSet = - readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); - Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext( - window, timers, timestamp, rootTrigger, finishedSet); - rootTrigger.invokeOnElement(triggerContext); - persistFinishedSet(state, finishedSet); - } - - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "prefetch side effect") - public void prefetchForMerge( - W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) { - if (isFinishedSetNeeded()) { - for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) { - value.readLater(); - } - } - rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor( - window, mergingWindows, rootTrigger)); - } - - /** - * Run the trigger merging logic as part of executing the specified merge. - */ - public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception { - // Clone so that we can detect changes and so that changes here don't pollute merging. - FinishedTriggersBitSet finishedSet = - readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); - - // And read the finished bits in each merging window. - ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder(); - for (Map.Entry<W, ValueState<BitSet>> entry : - state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) { - // Don't need to clone these, since the trigger context doesn't allow modification - builder.put(entry.getKey(), readFinishedBits(entry.getValue())); - // Clear the underlying finished bits. - clearFinishedBits(entry.getValue()); - } - ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build(); - - Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext( - window, timers, rootTrigger, finishedSet, mergingFinishedSets); - - // Run the merge from the trigger - rootTrigger.invokeOnMerge(mergeContext); - - persistFinishedSet(state, finishedSet); - } - - public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception { - FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); - Trigger.TriggerContext context = contextFactory.base(window, timers, - rootTrigger, finishedSet); - return rootTrigger.invokeShouldFire(context); - } - - public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception { - // shouldFire should be false. - // However it is too expensive to assert. - FinishedTriggersBitSet finishedSet = - readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); - Trigger.TriggerContext context = contextFactory.base(window, timers, - rootTrigger, finishedSet); - rootTrigger.invokeOnFire(context); - persistFinishedSet(state, finishedSet); - } - - private void persistFinishedSet( - StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) { - if (!isFinishedSetNeeded()) { - return; - } - - ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG); - if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) { - if (modifiedFinishedSet.getBitSet().isEmpty()) { - finishedSetState.clear(); - } else { - finishedSetState.write(modifiedFinishedSet.getBitSet()); - } - } - } - - /** - * Clear the finished bits. - */ - public void clearFinished(StateAccessor<?> state) { - clearFinishedBits(state.access(FINISHED_BITS_TAG)); - } - - /** - * Clear the state used for executing triggers, but leave the finished set to indicate - * the window is closed. - */ - public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception { - // Don't need to clone, because we'll be clearing the finished bits anyways. - FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)); - rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet)); - } - - private boolean isFinishedSetNeeded() { - // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the - // lookup. Right now, we special case this for the DefaultTrigger. - return !(rootTrigger.getSpec() instanceof DefaultTrigger); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java new file mode 100644 index 0000000..2f4ad63 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.sdk.annotations.Experimental; + +/** + * A {@link TriggerStateMachine} that fires and finishes once after all of its sub-triggers + * have fired. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class AfterAllStateMachine extends OnceTriggerStateMachine { + + private AfterAllStateMachine(List<TriggerStateMachine> subTriggers) { + super(subTriggers); + checkArgument(subTriggers.size() > 1); + } + + /** + * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers. + */ + @SafeVarargs + public static OnceTriggerStateMachine of(OnceTriggerStateMachine... triggers) { + return new AfterAllStateMachine(Arrays.<TriggerStateMachine>asList(triggers)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + for (ExecutableTriggerStateMachine subTrigger : c.trigger().unfinishedSubTriggers()) { + // Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH. + // invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH. + subTrigger.invokeOnElement(c); + } + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) { + subTrigger.invokeOnMerge(c); + } + boolean allFinished = true; + for (ExecutableTriggerStateMachine subTrigger1 : c.trigger().subTriggers()) { + allFinished &= c.forTrigger(subTrigger1).trigger().isFinished(); + } + c.trigger().setFinished(allFinished); + } + + /** + * {@inheritDoc} + * + * @return {@code true} if all subtriggers return {@code true}. + */ + @Override + public boolean shouldFire(TriggerContext context) throws Exception { + for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { + if (!context.forTrigger(subtrigger).trigger().isFinished() + && !subtrigger.invokeShouldFire(context)) { + return false; + } + } + return true; + } + + /** + * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire} + * because they all must be ready to fire. + */ + @Override + public void onOnlyFiring(TriggerContext context) throws Exception { + for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { + subtrigger.invokeOnFire(context); + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterAll.of("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java new file mode 100644 index 0000000..a6616fa --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import com.google.common.collect.ImmutableList; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateMerging; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.format.PeriodFormat; +import org.joda.time.format.PeriodFormatter; + +/** + * A base class for triggers that happen after a processing time delay from the arrival + * of the first element in a pane. + * + * <p>This class is for internal use only and may change at any time. + */ +@Experimental(Experimental.Kind.TRIGGER) +public abstract class AfterDelayFromFirstElementStateMachine extends OnceTriggerStateMachine { + + protected static final List<SerializableFunction<Instant, Instant>> IDENTITY = + ImmutableList.<SerializableFunction<Instant, Instant>>of(); + + protected static final StateTag<Object, AccumulatorCombiningState<Instant, + Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG = + StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( + "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder())); + + private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH); + + /** + * To complete an implementation, return the desired time from the TriggerContext. + */ + @Nullable + public abstract Instant getCurrentTime(TriggerStateMachine.TriggerContext context); + + /** + * To complete an implementation, return a new instance like this one, but incorporating + * the provided timestamp mapping functions. Generally should be used by calling the + * constructor of this class from the constructor of the subclass. + */ + protected abstract AfterDelayFromFirstElementStateMachine newWith( + List<SerializableFunction<Instant, Instant>> transform); + + /** + * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed in sequence. The + * overall mapping for an instance `instance` is `m_n(... m3(m2(m1(instant))`, + * implemented via #computeTargetTimestamp + */ + protected final List<SerializableFunction<Instant, Instant>> timestampMappers; + + private final TimeDomain timeDomain; + + public AfterDelayFromFirstElementStateMachine( + TimeDomain timeDomain, + List<SerializableFunction<Instant, Instant>> timestampMappers) { + super(null); + this.timestampMappers = timestampMappers; + this.timeDomain = timeDomain; + } + + private Instant getTargetTimestamp(OnElementContext c) { + return computeTargetTimestamp(c.currentProcessingTime()); + } + + /** + * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater + * than the timestamp. + * + * <p>TODO: Consider sharing this with FixedWindows, and bring over the equivalent of + * CalendarWindows. + */ + public AfterDelayFromFirstElementStateMachine alignedTo( + final Duration size, final Instant offset) { + return newWith(new AlignFn(size, offset)); + } + + /** + * Aligns the time to be the smallest multiple of {@code size} greater than the timestamp + * since the epoch. + */ + public AfterDelayFromFirstElementStateMachine alignedTo(final Duration size) { + return alignedTo(size, new Instant(0)); + } + + /** + * Adds some delay to the original target time. + * + * @param delay the delay to add + * @return An updated time trigger that will wait the additional time before firing. + */ + public AfterDelayFromFirstElementStateMachine plusDelayOf(final Duration delay) { + return newWith(new DelayFn(delay)); + } + + @Override + public boolean isCompatible(TriggerStateMachine other) { + if (!getClass().equals(other.getClass())) { + return false; + } + + AfterDelayFromFirstElementStateMachine that = (AfterDelayFromFirstElementStateMachine) other; + return this.timestampMappers.equals(that.timestampMappers); + } + + + private AfterDelayFromFirstElementStateMachine newWith( + SerializableFunction<Instant, Instant> timestampMapper) { + return newWith( + ImmutableList.<SerializableFunction<Instant, Instant>>builder() + .addAll(timestampMappers) + .add(timestampMapper) + .build()); + } + + @Override + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = + "prefetch side effect") + public void prefetchOnElement(StateAccessor<?> state) { + state.access(DELAYED_UNTIL_TAG).readLater(); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG); + Instant oldDelayUntil = delayUntilState.read(); + + // Since processing time can only advance, resulting in target wake-up times we would + // ignore anyhow, we don't bother with it if it is already set. + if (oldDelayUntil != null) { + return; + } + + Instant targetTimestamp = getTargetTimestamp(c); + delayUntilState.add(targetTimestamp); + c.setTimer(targetTimestamp, timeDomain); + } + + @Override + public void prefetchOnMerge(MergingStateAccessor<?, ?> state) { + super.prefetchOnMerge(state); + StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + // NOTE: We could try to delete all timers which are still active, but we would + // need access to a timer context for each merging window. + // for (CombiningValueStateInternal<Instant, Combine.Holder<Instant>, Instant> state : + // c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) { + // Instant timestamp = state.get().read(); + // if (timestamp != null) { + // <context for merging window>.deleteTimer(timestamp, timeDomain); + // } + // } + // Instead let them fire and be ignored. + + // If the trigger is already finished, there is no way it will become re-activated + if (c.trigger().isFinished()) { + StateMerging.clear(c.state(), DELAYED_UNTIL_TAG); + // NOTE: We do not attempt to delete the timers. + return; + } + + // Determine the earliest point across all the windows, and delay to that. + StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG); + + Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read(); + if (earliestTargetTime != null) { + c.setTimer(earliestTargetTime, timeDomain); + } + } + + @Override + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = + "prefetch side effect") + public void prefetchShouldFire(StateAccessor<?> state) { + state.access(DELAYED_UNTIL_TAG).readLater(); + } + + @Override + public void clear(TriggerContext c) throws Exception { + c.state().access(DELAYED_UNTIL_TAG).clear(); + } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read(); + return delayedUntil != null + && getCurrentTime(context) != null + && getCurrentTime(context).isAfter(delayedUntil); + } + + @Override + protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { + clear(context); + } + + protected Instant computeTargetTimestamp(Instant time) { + Instant result = time; + for (SerializableFunction<Instant, Instant> timestampMapper : timestampMappers) { + result = timestampMapper.apply(result); + } + return result; + } + + /** + * A {@link SerializableFunction} to delay the timestamp at which this triggers fires. + */ + private static final class DelayFn implements SerializableFunction<Instant, Instant> { + private final Duration delay; + + public DelayFn(Duration delay) { + this.delay = delay; + } + + @Override + public Instant apply(Instant input) { + return input.plus(delay); + } + + @Override + public boolean equals(Object object) { + if (object == this) { + return true; + } + + if (!(object instanceof DelayFn)) { + return false; + } + + return this.delay.equals(((DelayFn) object).delay); + } + + @Override + public int hashCode() { + return Objects.hash(delay); + } + + @Override + public String toString() { + return PERIOD_FORMATTER.print(delay.toPeriod()); + } + } + + /** + * A {@link SerializableFunction} to align an instant to the nearest interval boundary. + */ + static final class AlignFn implements SerializableFunction<Instant, Instant> { + private final Duration size; + private final Instant offset; + + + /** + * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater + * than the timestamp. + */ + public AlignFn(Duration size, Instant offset) { + this.size = size; + this.offset = offset; + } + + @Override + public Instant apply(Instant point) { + long millisSinceStart = new Duration(offset, point).getMillis() % size.getMillis(); + return millisSinceStart == 0 ? point : point.plus(size).minus(millisSinceStart); + } + + @Override + public boolean equals(Object object) { + if (object == this) { + return true; + } + + if (!(object instanceof AlignFn)) { + return false; + } + + AlignFn other = (AlignFn) object; + return other.size.equals(this.size) + && other.offset.equals(this.offset); + } + + @Override + public int hashCode() { + return Objects.hash(size, offset); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java new file mode 100644 index 0000000..140ac75 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.List; + +/** + * A composite {@link TriggerStateMachine} that executes its sub-triggers in order. + * Only one sub-trigger is executing at a time, + * and any time it fires the {@code AfterEach} fires. When the currently executing + * sub-trigger finishes, the {@code AfterEach} starts executing the next sub-trigger. + * + * <p>{@code AfterEach.inOrder(t1, t2, ...)} finishes when all of the sub-triggers have finished. + * + * <p>The following properties hold: + * <ul> + * <li> {@code AfterEach.inOrder(AfterEach.inOrder(a, b), c)} behaves the same as + * {@code AfterEach.inOrder(a, b, c)} and {@code AfterEach.inOrder(a, AfterEach.inOrder(b, c)}. + * <li> {@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same as + * {@code Repeatedly.forever(a)}, since the repeated trigger never finishes. + * </ul> + */ +public class AfterEachStateMachine extends TriggerStateMachine { + + private AfterEachStateMachine(List<TriggerStateMachine> subTriggers) { + super(subTriggers); + checkArgument(subTriggers.size() > 1); + } + + /** + * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers. + */ + @SafeVarargs + public static TriggerStateMachine inOrder(TriggerStateMachine... triggers) { + return new AfterEachStateMachine(Arrays.<TriggerStateMachine>asList(triggers)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + if (!c.trigger().isMerging()) { + // If merges are not possible, we need only run the first unfinished subtrigger + c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c); + } else { + // If merges are possible, we need to run all subtriggers in parallel + for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) { + // Even if the subTrigger is done, it may be revived via merging and must have + // adequate state. + subTrigger.invokeOnElement(c); + } + } + } + + @Override + public void onMerge(OnMergeContext context) throws Exception { + // If merging makes a subtrigger no-longer-finished, it will automatically + // begin participating in shouldFire and onFire appropriately. + + // All the following triggers are retroactively "not started" but that is + // also automatic because they are cleared whenever this trigger + // fires. + boolean priorTriggersAllFinished = true; + for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) { + if (priorTriggersAllFinished) { + subTrigger.invokeOnMerge(context); + priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished(); + } else { + subTrigger.invokeClear(context); + } + } + updateFinishedState(context); + } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + ExecutableTriggerStateMachine firstUnfinished = context.trigger().firstUnfinishedSubTrigger(); + return firstUnfinished.invokeShouldFire(context); + } + + @Override + public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { + context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context); + + // Reset all subtriggers if in a merging context; any may be revived by merging so they are + // all run in parallel for each pending pane. + if (context.trigger().isMerging()) { + for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) { + subTrigger.invokeClear(context); + } + } + + updateFinishedState(context); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterEach.inOrder("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } + + private void updateFinishedState(TriggerContext context) { + context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java new file mode 100644 index 0000000..272c278 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.sdk.annotations.Experimental; + +/** + * Create a composite {@link TriggerStateMachine} that fires once after at least one of its + * sub-triggers have fired. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class AfterFirstStateMachine extends OnceTriggerStateMachine { + + AfterFirstStateMachine(List<TriggerStateMachine> subTriggers) { + super(subTriggers); + checkArgument(subTriggers.size() > 1); + } + + /** + * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers. + */ + @SafeVarargs + public static OnceTriggerStateMachine of( + OnceTriggerStateMachine... triggers) { + return new AfterFirstStateMachine(Arrays.<TriggerStateMachine>asList(triggers)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) { + subTrigger.invokeOnElement(c); + } + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) { + subTrigger.invokeOnMerge(c); + } + updateFinishedStatus(c); + } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { + if (context.forTrigger(subtrigger).trigger().isFinished() + || subtrigger.invokeShouldFire(context)) { + return true; + } + } + return false; + } + + @Override + protected void onOnlyFiring(TriggerContext context) throws Exception { + for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { + TriggerContext subContext = context.forTrigger(subtrigger); + if (subtrigger.invokeShouldFire(subContext)) { + // If the trigger is ready to fire, then do whatever it needs to do. + subtrigger.invokeOnFire(subContext); + } else { + // If the trigger is not ready to fire, it is nonetheless true that whatever + // pending pane it was tracking is now gone. + subtrigger.invokeClear(subContext); + } + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterFirst.of("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } + + private void updateFinishedStatus(TriggerContext c) { + boolean anyFinished = false; + for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) { + anyFinished |= c.forTrigger(subTrigger).trigger().isFinished(); + } + c.trigger().setFinished(anyFinished); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java new file mode 100644 index 0000000..723aba6 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Objects; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateMerging; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; + +/** + * {@link TriggerStateMachine}s that fire based on properties of the elements in the current pane. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class AfterPaneStateMachine extends OnceTriggerStateMachine { + +private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> + ELEMENTS_IN_PANE_TAG = + StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( + "count", VarLongCoder.of(), new Sum.SumLongFn())); + + private final int countElems; + + private AfterPaneStateMachine(int countElems) { + super(null); + this.countElems = countElems; + } + + /** + * Creates a trigger that fires when the pane contains at least {@code countElems} elements. + */ + public static AfterPaneStateMachine elementCountAtLeast(int countElems) { + return new AfterPaneStateMachine(countElems); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + c.state().access(ELEMENTS_IN_PANE_TAG).add(1L); + } + + @Override + public void prefetchOnMerge(MergingStateAccessor<?, ?> state) { + super.prefetchOnMerge(state); + StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG); + } + + @Override + public void onMerge(OnMergeContext context) throws Exception { + // If we've already received enough elements and finished in some window, + // then this trigger is just finished. + if (context.trigger().finishedInAnyMergingWindow()) { + context.trigger().setFinished(true); + StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG); + return; + } + + // Otherwise, compute the sum of elements in all the active panes. + StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG); + } + + @Override + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = + "prefetch side effect") + public void prefetchShouldFire(StateAccessor<?> state) { + state.access(ELEMENTS_IN_PANE_TAG).readLater(); + } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + long count = context.state().access(ELEMENTS_IN_PANE_TAG).read(); + return count >= countElems; + } + + @Override + public void clear(TriggerContext c) throws Exception { + c.state().access(ELEMENTS_IN_PANE_TAG).clear(); + } + + @Override + public boolean isCompatible(TriggerStateMachine other) { + return this.equals(other); + } + + @Override + public String toString() { + return "AfterPane.elementCountAtLeast(" + countElems + ")"; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof AfterPaneStateMachine)) { + return false; + } + AfterPaneStateMachine that = (AfterPaneStateMachine) obj; + return this.countElems == that.countElems; + } + + @Override + public int hashCode() { + return Objects.hash(countElems); + } + + @Override + protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { + clear(context); + } +}