http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java new file mode 100644 index 0000000..a960aa4 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java @@ -0,0 +1,527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.common.base.Joiner; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.joda.time.Instant; + +/** + * {@code Trigger}s control when the elements for a specific key and window are output. As elements + * arrive, they are put into one or more windows by a {@link Window} transform and its associated + * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the + * {@code Window}s contents should be output. + * + * <p>See {@link org.apache.beam.sdk.transforms.GroupByKey} and {@link Window} + * for more information about how grouping with windows works. + * + * <p>The elements that are assigned to a window since the last time it was fired (or since the + * window was created) are placed into the current window pane. Triggers are evaluated against the + * elements as they are added. When the root trigger fires, the elements in the current pane will be + * output. When the root trigger finishes (indicating it will never fire again), the window is + * closed and any new elements assigned to that window are discarded. + * + * <p>Several predefined {@code Trigger}s are provided: + * <ul> + * <li> {@link AfterWatermark} for firing when the watermark passes a timestamp determined from + * either the end of the window or the arrival of the first element in a pane. + * <li> {@link AfterProcessingTime} for firing after some amount of processing time has elapsed + * (typically since the first element in a pane). + * <li> {@link AfterPane} for firing off a property of the elements in the current pane, such as + * the number of elements that have been assigned to the current pane. + * </ul> + * + * <p>In addition, {@code Trigger}s can be combined in a variety of ways: + * <ul> + * <li> {@link Repeatedly#forever} to create a trigger that executes forever. Any time its + * argument finishes it gets reset and starts over. Can be combined with + * {@link Trigger#orFinally} to specify a condition that causes the repetition to stop. + * <li> {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every) + * time that a trigger fires, and advancing to the next trigger in the sequence when it finishes. + * <li> {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments + * fires. An {@link AfterFirst} trigger finishes after it fires once. + * <li> {@link AfterAll#of} to create a trigger that fires after all least one of its arguments + * have fired at least once. An {@link AfterAll} trigger finishes after it fires once. + * </ul> + * + * <p>Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one + * of the following states: + * <ul> + * <li> Never Existed - before the trigger has started executing, there is no state associated + * with it anywhere in the system. A trigger moves to the executing state as soon as it + * processes in the current pane. + * <li> Executing - while the trigger is receiving items and may fire. While it is in this state, + * it may persist book-keeping information to persisted state, set timers, etc. + * <li> Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the + * system remembers only that it is finished. Entering this state causes us to discard any + * elements in the buffer for that window, as well. + * </ul> + * + * <p>Once finished, a trigger cannot return itself back to an earlier state, however a composite + * trigger could reset its sub-triggers. + * + * <p>Triggers should not build up any state internally since they may be recreated + * between invocations of the callbacks. All important values should be persisted using + * state before the callback returns. + */ +@Experimental(Experimental.Kind.TRIGGER) +public abstract class Trigger implements Serializable { + + /** + * Interface for accessing information about the trigger being executed and other triggers in the + * same tree. + */ + public interface TriggerInfo { + + /** + * Returns true if the windowing strategy of the current {@code PCollection} is a merging + * WindowFn. If true, the trigger execution needs to keep enough information to support the + * possibility of {@link Trigger#onMerge} being called. If false, {@link Trigger#onMerge} will + * never be called. + */ + boolean isMerging(); + + /** + * Access the executable versions of the sub-triggers of the current trigger. + */ + Iterable<ExecutableTrigger> subTriggers(); + + /** + * Access the executable version of the specified sub-trigger. + */ + ExecutableTrigger subTrigger(int subtriggerIndex); + + /** + * Returns true if the current trigger is marked finished. + */ + boolean isFinished(); + + /** + * Return true if the given subtrigger is marked finished. + */ + boolean isFinished(int subtriggerIndex); + + /** + * Returns true if all the sub-triggers of the current trigger are marked finished. + */ + boolean areAllSubtriggersFinished(); + + /** + * Returns an iterable over the unfinished sub-triggers of the current trigger. + */ + Iterable<ExecutableTrigger> unfinishedSubTriggers(); + + /** + * Returns the first unfinished sub-trigger. + */ + ExecutableTrigger firstUnfinishedSubTrigger(); + + /** + * Clears all keyed state for triggers in the current sub-tree and unsets all the associated + * finished bits. + */ + void resetTree() throws Exception; + + /** + * Sets the finished bit for the current trigger. + */ + void setFinished(boolean finished); + + /** + * Sets the finished bit for the given sub-trigger. + */ + void setFinished(boolean finished, int subTriggerIndex); + } + + /** + * Interact with properties of the trigger being executed, with extensions to deal with the + * merging windows. + */ + public interface MergingTriggerInfo extends TriggerInfo { + + /** Return true if the trigger is finished in any window being merged. */ + public abstract boolean finishedInAnyMergingWindow(); + + /** Return true if the trigger is finished in all windows being merged. */ + public abstract boolean finishedInAllMergingWindows(); + } + + /** + * Information accessible to all operational hooks in this {@code Trigger}. + * + * <p>Used directly in {@link Trigger#shouldFire} and {@link Trigger#clear}, and + * extended with additional information in other methods. + */ + public abstract class TriggerContext { + + /** Returns the interface for accessing trigger info. */ + public abstract TriggerInfo trigger(); + + /** Returns the interface for accessing persistent state. */ + public abstract StateAccessor<?> state(); + + /** The window that the current context is executing in. */ + public abstract BoundedWindow window(); + + /** Create a sub-context for the given sub-trigger. */ + public abstract TriggerContext forTrigger(ExecutableTrigger trigger); + + /** + * Removes the timer set in this trigger context for the given {@link Instant} + * and {@link TimeDomain}. + */ + public abstract void deleteTimer(Instant timestamp, TimeDomain domain); + + /** The current processing time. */ + public abstract Instant currentProcessingTime(); + + /** The current synchronized upstream processing time or {@code null} if unknown. */ + @Nullable + public abstract Instant currentSynchronizedProcessingTime(); + + /** The current event time for the input or {@code null} if unknown. */ + @Nullable + public abstract Instant currentEventTime(); + } + + /** + * Extended {@link TriggerContext} containing information accessible to the {@link #onElement} + * operational hook. + */ + public abstract class OnElementContext extends TriggerContext { + /** The event timestamp of the element currently being processed. */ + public abstract Instant eventTimestamp(); + + /** + * Sets a timer to fire when the watermark or processing time is beyond the given timestamp. + * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards. + * + * <p>As with {@link #state}, timers are implicitly scoped to the current window. All + * timer firings for a window will be received, but the implementation should choose to ignore + * those that are not applicable. + * + * @param timestamp the time at which the trigger should be re-evaluated + * @param domain the domain that the {@code timestamp} applies to + */ + public abstract void setTimer(Instant timestamp, TimeDomain domain); + + /** Create an {@code OnElementContext} for executing the given trigger. */ + @Override + public abstract OnElementContext forTrigger(ExecutableTrigger trigger); + } + + /** + * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge} + * operational hook. + */ + public abstract class OnMergeContext extends TriggerContext { + /** + * Sets a timer to fire when the watermark or processing time is beyond the given timestamp. + * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards. + * + * <p>As with {@link #state}, timers are implicitly scoped to the current window. All + * timer firings for a window will be received, but the implementation should choose to ignore + * those that are not applicable. + * + * @param timestamp the time at which the trigger should be re-evaluated + * @param domain the domain that the {@code timestamp} applies to + */ + public abstract void setTimer(Instant timestamp, TimeDomain domain); + + /** Create an {@code OnMergeContext} for executing the given trigger. */ + @Override + public abstract OnMergeContext forTrigger(ExecutableTrigger trigger); + + @Override + public abstract MergingStateAccessor<?, ?> state(); + + @Override + public abstract MergingTriggerInfo trigger(); + } + + @Nullable + protected final List<Trigger> subTriggers; + + protected Trigger(@Nullable List<Trigger> subTriggers) { + this.subTriggers = subTriggers; + } + + + /** + * Called every time an element is incorporated into a window. + */ + public abstract void onElement(OnElementContext c) throws Exception; + + /** + * Called immediately after windows have been merged. + * + * <p>Leaf triggers should update their state by inspecting their status and any state + * in the merging windows. Composite triggers should update their state by calling + * {@link ExecutableTrigger#invokeOnMerge} on their sub-triggers, and applying appropriate logic. + * + * <p>A trigger such as {@link AfterWatermark#pastEndOfWindow} may no longer be finished; + * it is the responsibility of the trigger itself to record this fact. It is forbidden for + * a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending + * elements that led to it being ready to fire. + * + * <p>The implementation does not need to clear out any state associated with the old windows. + */ + public abstract void onMerge(OnMergeContext c) throws Exception; + + /** + * Returns {@code true} if the current state of the trigger indicates that its condition + * is satisfied and it is ready to fire. + */ + public abstract boolean shouldFire(TriggerContext context) throws Exception; + + /** + * Adjusts the state of the trigger to be ready for the next pane. For example, a + * {@link Repeatedly} trigger will reset its inner trigger, since it has fired. + * + * <p>If the trigger is finished, it is the responsibility of the trigger itself to + * record that fact via the {@code context}. + */ + public abstract void onFire(TriggerContext context) throws Exception; + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #onElement} call. + */ + public void prefetchOnElement(StateAccessor<?> state) { + if (subTriggers != null) { + for (Trigger trigger : subTriggers) { + trigger.prefetchOnElement(state); + } + } + } + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #onMerge} call. + */ + public void prefetchOnMerge(MergingStateAccessor<?, ?> state) { + if (subTriggers != null) { + for (Trigger trigger : subTriggers) { + trigger.prefetchOnMerge(state); + } + } + } + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #shouldFire} call. + */ + public void prefetchShouldFire(StateAccessor<?> state) { + if (subTriggers != null) { + for (Trigger trigger : subTriggers) { + trigger.prefetchShouldFire(state); + } + } + } + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #onFire} call. + */ + public void prefetchOnFire(StateAccessor<?> state) { + if (subTriggers != null) { + for (Trigger trigger : subTriggers) { + trigger.prefetchOnFire(state); + } + } + } + + /** + * Clear any state associated with this trigger in the given window. + * + * <p>This is called after a trigger has indicated it will never fire again. The trigger system + * keeps enough information to know that the trigger is finished, so this trigger should clear all + * of its state. + */ + public void clear(TriggerContext c) throws Exception { + if (subTriggers != null) { + for (ExecutableTrigger trigger : c.trigger().subTriggers()) { + trigger.invokeClear(c); + } + } + } + + public Iterable<Trigger> subTriggers() { + return subTriggers; + } + + /** + * Return a trigger to use after a {@code GroupByKey} to preserve the + * intention of this trigger. Specifically, triggers that are time based + * and intended to provide speculative results should continue providing + * speculative results. Triggers that fire once (or multiple times) should + * continue firing once (or multiple times). + */ + public Trigger getContinuationTrigger() { + if (subTriggers == null) { + return getContinuationTrigger(null); + } + + List<Trigger> subTriggerContinuations = new ArrayList<>(); + for (Trigger subTrigger : subTriggers) { + subTriggerContinuations.add(subTrigger.getContinuationTrigger()); + } + return getContinuationTrigger(subTriggerContinuations); + } + + /** + * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For convenience, this + * is provided the continuation trigger of each of the sub-triggers. + */ + protected abstract Trigger getContinuationTrigger(List<Trigger> continuationTriggers); + + /** + * Returns a bound in watermark time by which this trigger would have fired at least once + * for a given window had there been input data. This is a static property of a trigger + * that does not depend on its state. + * + * <p>For triggers that do not fire based on the watermark advancing, returns + * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + * + * <p>This estimate is used to determine that there are no elements in a side-input window, which + * causes the default value to be used instead. + */ + public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window); + + /** + * Returns whether this performs the same triggering as the given {@code Trigger}. + */ + public boolean isCompatible(Trigger other) { + if (!getClass().equals(other.getClass())) { + return false; + } + + if (subTriggers == null) { + return other.subTriggers == null; + } else if (other.subTriggers == null) { + return false; + } else if (subTriggers.size() != other.subTriggers.size()) { + return false; + } + + for (int i = 0; i < subTriggers.size(); i++) { + if (!subTriggers.get(i).isCompatible(other.subTriggers.get(i))) { + return false; + } + } + + return true; + } + + @Override + public String toString() { + String simpleName = getClass().getSimpleName(); + if (getClass().getEnclosingClass() != null) { + simpleName = getClass().getEnclosingClass().getSimpleName() + "." + simpleName; + } + if (subTriggers == null || subTriggers.size() == 0) { + return simpleName; + } else { + return simpleName + "(" + Joiner.on(", ").join(subTriggers) + ")"; + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof Trigger)) { + return false; + } + Trigger that = (Trigger) obj; + return Objects.equals(getClass(), that.getClass()) + && Objects.equals(subTriggers, that.subTriggers); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), subTriggers); + } + + /** + * Specify an ending condition for this trigger. If the {@code until} fires then the combination + * fires. + * + * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes + * as soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time + * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that + * {@code t1} may have fired since {@code t2} started, so not all of the elements that {@code t2} + * has seen are necessarily in the current pane. + * + * <p>For example the final firing of the following trigger may only have 1 element: + * <pre> {@code + * Repeatedly.forever(AfterPane.elementCountAtLeast(2)) + * .orFinally(AfterPane.elementCountAtLeast(5)) + * } </pre> + * + * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same + * as {@code AfterFirst.of(t1, t2)}. + */ + public Trigger orFinally(OnceTrigger until) { + return new OrFinallyTrigger(this, until); + } + + /** + * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather + * than the general {@link Trigger} class to indicate that behavior. + */ + public abstract static class OnceTrigger extends Trigger { + protected OnceTrigger(List<Trigger> subTriggers) { + super(subTriggers); + } + + @Override + public final OnceTrigger getContinuationTrigger() { + Trigger continuation = super.getContinuationTrigger(); + if (!(continuation instanceof OnceTrigger)) { + throw new IllegalStateException("Continuation of a OnceTrigger must be a OnceTrigger"); + } + return (OnceTrigger) continuation; + } + + /** + * {@inheritDoc} + */ + @Override + public final void onFire(TriggerContext context) throws Exception { + onOnlyFiring(context); + context.trigger().setFinished(true); + } + + /** + * Called exactly once by {@link #onFire} when the trigger is fired. By default, + * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}. + */ + protected abstract void onOnlyFiring(TriggerContext context) throws Exception; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java new file mode 100644 index 0000000..088c499 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; + +/** + * A wrapper around a trigger used during execution. While an actual trigger may appear multiple + * times (both in the same trigger expression and in other trigger expressions), the + * {@code ExecutableTrigger} wrapped around them forms a tree (only one occurrence). + */ +public class ExecutableTrigger implements Serializable { + + /** Store the index assigned to this trigger. */ + private final int triggerIndex; + private final int firstIndexAfterSubtree; + private final List<ExecutableTrigger> subTriggers = new ArrayList<>(); + private final Trigger trigger; + + public static <W extends BoundedWindow> ExecutableTrigger create(Trigger trigger) { + return create(trigger, 0); + } + + private static <W extends BoundedWindow> ExecutableTrigger create( + Trigger trigger, int nextUnusedIndex) { + if (trigger instanceof OnceTrigger) { + return new ExecutableOnceTrigger((OnceTrigger) trigger, nextUnusedIndex); + } else { + return new ExecutableTrigger(trigger, nextUnusedIndex); + } + } + + public static <W extends BoundedWindow> ExecutableTrigger createForOnceTrigger( + OnceTrigger trigger, int nextUnusedIndex) { + return new ExecutableOnceTrigger(trigger, nextUnusedIndex); + } + + private ExecutableTrigger(Trigger trigger, int nextUnusedIndex) { + this.trigger = checkNotNull(trigger, "trigger must not be null"); + this.triggerIndex = nextUnusedIndex++; + + if (trigger.subTriggers() != null) { + for (Trigger subTrigger : trigger.subTriggers()) { + ExecutableTrigger subExecutable = create(subTrigger, nextUnusedIndex); + subTriggers.add(subExecutable); + nextUnusedIndex = subExecutable.firstIndexAfterSubtree; + } + } + firstIndexAfterSubtree = nextUnusedIndex; + } + + public List<ExecutableTrigger> subTriggers() { + return subTriggers; + } + + @Override + public String toString() { + return trigger.toString(); + } + + /** + * Return the underlying trigger specification corresponding to this {@code ExecutableTrigger}. + */ + public Trigger getSpec() { + return trigger; + } + + public int getTriggerIndex() { + return triggerIndex; + } + + public final int getFirstIndexAfterSubtree() { + return firstIndexAfterSubtree; + } + + public boolean isCompatible(ExecutableTrigger other) { + return trigger.isCompatible(other.trigger); + } + + public ExecutableTrigger getSubTriggerContaining(int index) { + checkNotNull(subTriggers); + checkState(index > triggerIndex && index < firstIndexAfterSubtree, + "Cannot find sub-trigger containing index not in this tree."); + ExecutableTrigger previous = null; + for (ExecutableTrigger subTrigger : subTriggers) { + if (index < subTrigger.triggerIndex) { + return previous; + } + previous = subTrigger; + } + return previous; + } + + /** + * Invoke the {@link Trigger#onElement} method for this trigger, ensuring that the bits are + * properly updated if the trigger finishes. + */ + public void invokeOnElement(Trigger.OnElementContext c) throws Exception { + trigger.onElement(c.forTrigger(this)); + } + + /** + * Invoke the {@link Trigger#onMerge} method for this trigger, ensuring that the bits are properly + * updated. + */ + public void invokeOnMerge(Trigger.OnMergeContext c) throws Exception { + Trigger.OnMergeContext subContext = c.forTrigger(this); + trigger.onMerge(subContext); + } + + public boolean invokeShouldFire(Trigger.TriggerContext c) throws Exception { + return trigger.shouldFire(c.forTrigger(this)); + } + + public void invokeOnFire(Trigger.TriggerContext c) throws Exception { + trigger.onFire(c.forTrigger(this)); + } + + /** + * Invoke clear for the current this trigger. + */ + public void invokeClear(Trigger.TriggerContext c) throws Exception { + trigger.clear(c.forTrigger(this)); + } + + /** + * {@link ExecutableTrigger} that enforces the fact that the trigger should always FIRE_AND_FINISH + * and never just FIRE. + */ + private static class ExecutableOnceTrigger extends ExecutableTrigger { + + public ExecutableOnceTrigger(OnceTrigger trigger, int nextUnusedIndex) { + super(trigger, nextUnusedIndex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java new file mode 100644 index 0000000..6666ab9 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +/** + * A mutable set which tracks whether any particular {@link ExecutableTrigger} is + * finished. + */ +public interface FinishedTriggers { + /** + * Returns {@code true} if the trigger is finished. + */ + public boolean isFinished(ExecutableTrigger trigger); + + /** + * Sets the fact that the trigger is finished. + */ + public void setFinished(ExecutableTrigger trigger, boolean value); + + /** + * Sets the trigger and all of its subtriggers to unfinished. + */ + public void clearRecursively(ExecutableTrigger trigger); + + /** + * Create an independent copy of this mutable {@link FinishedTriggers}. + */ + public FinishedTriggers copy(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java new file mode 100644 index 0000000..4cd617f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.util.BitSet; + +/** + * A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}. + */ +public class FinishedTriggersBitSet implements FinishedTriggers { + + private final BitSet bitSet; + + private FinishedTriggersBitSet(BitSet bitSet) { + this.bitSet = bitSet; + } + + public static FinishedTriggersBitSet emptyWithCapacity(int capacity) { + return new FinishedTriggersBitSet(new BitSet(capacity)); + } + + public static FinishedTriggersBitSet fromBitSet(BitSet bitSet) { + return new FinishedTriggersBitSet(bitSet); + } + + /** + * Returns the underlying {@link BitSet} for this {@link FinishedTriggersBitSet}. + */ + public BitSet getBitSet() { + return bitSet; + } + + @Override + public boolean isFinished(ExecutableTrigger trigger) { + return bitSet.get(trigger.getTriggerIndex()); + } + + @Override + public void setFinished(ExecutableTrigger trigger, boolean value) { + bitSet.set(trigger.getTriggerIndex(), value); + } + + @Override + public void clearRecursively(ExecutableTrigger trigger) { + bitSet.clear(trigger.getTriggerIndex(), trigger.getFirstIndexAfterSubtree()); + } + + @Override + public FinishedTriggersBitSet copy() { + return new FinishedTriggersBitSet((BitSet) bitSet.clone()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java new file mode 100644 index 0000000..a9feb73 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.common.collect.Sets; +import java.util.Set; + +/** + * An implementation of {@link FinishedTriggers} atop a user-provided mutable {@link Set}. + */ +public class FinishedTriggersSet implements FinishedTriggers { + + private final Set<ExecutableTrigger> finishedTriggers; + + private FinishedTriggersSet(Set<ExecutableTrigger> finishedTriggers) { + this.finishedTriggers = finishedTriggers; + } + + public static FinishedTriggersSet fromSet(Set<ExecutableTrigger> finishedTriggers) { + return new FinishedTriggersSet(finishedTriggers); + } + + /** + * Returns a mutable {@link Set} of the underlying triggers that are finished. + */ + public Set<ExecutableTrigger> getFinishedTriggers() { + return finishedTriggers; + } + + @Override + public boolean isFinished(ExecutableTrigger trigger) { + return finishedTriggers.contains(trigger); + } + + @Override + public void setFinished(ExecutableTrigger trigger, boolean value) { + if (value) { + finishedTriggers.add(trigger); + } else { + finishedTriggers.remove(trigger); + } + } + + @Override + public void clearRecursively(ExecutableTrigger trigger) { + finishedTriggers.remove(trigger); + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + clearRecursively(subTrigger); + } + } + + @Override + public FinishedTriggersSet copy() { + return fromSet(Sets.newHashSet(finishedTriggers)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java new file mode 100644 index 0000000..9e2c27d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.joda.time.Instant; + +/** + * The trigger used with {@link Reshuffle} which triggers on every element + * and never buffers state. + * + * @param <W> The kind of window that is being reshuffled. + */ +public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger { + + public ReshuffleTrigger() { + super(null); + } + + @Override + public void onElement(Trigger.OnElementContext c) { } + + @Override + public void onMerge(Trigger.OnMergeContext c) { } + + @Override + protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + return this; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + throw new UnsupportedOperationException( + "ReshuffleTrigger should not be used outside of Reshuffle"); + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return true; + } + + @Override + public void onFire(Trigger.TriggerContext context) throws Exception { } + + @Override + public String toString() { + return "ReshuffleTrigger()"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java new file mode 100644 index 0000000..e09aac2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import java.util.Collection; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Trigger.MergingTriggerInfo; +import org.apache.beam.sdk.transforms.windowing.Trigger.TriggerInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTag; +import org.joda.time.Instant; + +/** + * Factory for creating instances of the various {@link Trigger} contexts. + * + * <p>These contexts are highly interdependent and share many fields; it is inadvisable + * to create them via any means other than this factory class. + */ +public class TriggerContextFactory<W extends BoundedWindow> { + + private final WindowFn<?, W> windowFn; + private StateInternals<?> stateInternals; + private final Coder<W> windowCoder; + + public TriggerContextFactory(WindowFn<?, W> windowFn, + StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) { + // Future triggers may be able to exploit the active window to state address window mapping. + this.windowFn = windowFn; + this.stateInternals = stateInternals; + this.windowCoder = windowFn.windowCoder(); + } + + public Trigger.TriggerContext base(W window, Timers timers, + ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { + return new TriggerContextImpl(window, timers, rootTrigger, finishedSet); + } + + public Trigger.OnElementContext createOnElementContext( + W window, Timers timers, Instant elementTimestamp, + ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { + return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp); + } + + public Trigger.OnMergeContext createOnMergeContext(W window, Timers timers, + ExecutableTrigger rootTrigger, FinishedTriggers finishedSet, + Map<W, FinishedTriggers> finishedSets) { + return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets); + } + + public StateAccessor<?> createStateAccessor(W window, ExecutableTrigger trigger) { + return new StateAccessorImpl(window, trigger); + } + + public MergingStateAccessor<?, W> createMergingStateAccessor( + W mergeResult, Collection<W> mergingWindows, ExecutableTrigger trigger) { + return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult); + } + + private class TriggerInfoImpl implements Trigger.TriggerInfo { + + protected final ExecutableTrigger trigger; + protected final FinishedTriggers finishedSet; + private final Trigger.TriggerContext context; + + public TriggerInfoImpl(ExecutableTrigger trigger, FinishedTriggers finishedSet, + Trigger.TriggerContext context) { + this.trigger = trigger; + this.finishedSet = finishedSet; + this.context = context; + } + + @Override + public boolean isMerging() { + return !windowFn.isNonMerging(); + } + + @Override + public Iterable<ExecutableTrigger> subTriggers() { + return trigger.subTriggers(); + } + + @Override + public ExecutableTrigger subTrigger(int subtriggerIndex) { + return trigger.subTriggers().get(subtriggerIndex); + } + + @Override + public boolean isFinished() { + return finishedSet.isFinished(trigger); + } + + @Override + public boolean isFinished(int subtriggerIndex) { + return finishedSet.isFinished(subTrigger(subtriggerIndex)); + } + + @Override + public boolean areAllSubtriggersFinished() { + return Iterables.isEmpty(unfinishedSubTriggers()); + } + + @Override + public Iterable<ExecutableTrigger> unfinishedSubTriggers() { + return FluentIterable + .from(trigger.subTriggers()) + .filter(new Predicate<ExecutableTrigger>() { + @Override + public boolean apply(ExecutableTrigger trigger) { + return !finishedSet.isFinished(trigger); + } + }); + } + + @Override + public ExecutableTrigger firstUnfinishedSubTrigger() { + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + if (!finishedSet.isFinished(subTrigger)) { + return subTrigger; + } + } + return null; + } + + @Override + public void resetTree() throws Exception { + finishedSet.clearRecursively(trigger); + trigger.invokeClear(context); + } + + @Override + public void setFinished(boolean finished) { + finishedSet.setFinished(trigger, finished); + } + + @Override + public void setFinished(boolean finished, int subTriggerIndex) { + finishedSet.setFinished(subTrigger(subTriggerIndex), finished); + } + } + + private class TriggerTimers implements Timers { + + private final Timers timers; + private final W window; + + public TriggerTimers(W window, Timers timers) { + this.timers = timers; + this.window = window; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain timeDomain) { + timers.setTimer(timestamp, timeDomain); + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { + if (timeDomain == TimeDomain.EVENT_TIME + && timestamp.equals(window.maxTimestamp())) { + // Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time + // state transitions. + return; + } + timers.deleteTimer(timestamp, timeDomain); + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } + + private class MergingTriggerInfoImpl + extends TriggerInfoImpl implements Trigger.MergingTriggerInfo { + + private final Map<W, FinishedTriggers> finishedSets; + + public MergingTriggerInfoImpl( + ExecutableTrigger trigger, + FinishedTriggers finishedSet, + Trigger.TriggerContext context, + Map<W, FinishedTriggers> finishedSets) { + super(trigger, finishedSet, context); + this.finishedSets = finishedSets; + } + + @Override + public boolean finishedInAnyMergingWindow() { + for (FinishedTriggers finishedSet : finishedSets.values()) { + if (finishedSet.isFinished(trigger)) { + return true; + } + } + return false; + } + + @Override + public boolean finishedInAllMergingWindows() { + for (FinishedTriggers finishedSet : finishedSets.values()) { + if (!finishedSet.isFinished(trigger)) { + return false; + } + } + return true; + } + } + + private class StateAccessorImpl implements StateAccessor<Object> { + protected final int triggerIndex; + protected final StateNamespace windowNamespace; + + public StateAccessorImpl( + W window, + ExecutableTrigger trigger) { + this.triggerIndex = trigger.getTriggerIndex(); + this.windowNamespace = namespaceFor(window); + } + + protected StateNamespace namespaceFor(W window) { + return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex); + } + + @Override + public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) { + return stateInternals.state(windowNamespace, address); + } + } + + private class MergingStateAccessorImpl extends StateAccessorImpl + implements MergingStateAccessor<Object, W> { + private final Collection<W> activeToBeMerged; + + public MergingStateAccessorImpl(ExecutableTrigger trigger, Collection<W> activeToBeMerged, + W mergeResult) { + super(mergeResult, trigger); + this.activeToBeMerged = activeToBeMerged; + } + + @Override + public <StateT extends State> StateT access( + StateTag<? super Object, StateT> address) { + return stateInternals.state(windowNamespace, address); + } + + @Override + public <StateT extends State> Map<W, StateT> accessInEachMergingWindow( + StateTag<? super Object, StateT> address) { + ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder(); + for (W mergingWindow : activeToBeMerged) { + StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address); + builder.put(mergingWindow, stateForWindow); + } + return builder.build(); + } + } + + private class TriggerContextImpl extends Trigger.TriggerContext { + + private final W window; + private final StateAccessorImpl state; + private final Timers timers; + private final TriggerInfoImpl triggerInfo; + + private TriggerContextImpl( + W window, + Timers timers, + ExecutableTrigger trigger, + FinishedTriggers finishedSet) { + trigger.getSpec().super(); + this.window = window; + this.state = new StateAccessorImpl(window, trigger); + this.timers = new TriggerTimers(window, timers); + this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this); + } + + @Override + public Trigger.TriggerContext forTrigger(ExecutableTrigger trigger) { + return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet); + } + + @Override + public TriggerInfo trigger() { + return triggerInfo; + } + + @Override + public StateAccessor<?> state() { + return state; + } + + @Override + public W window() { + return window; + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain domain) { + timers.deleteTimer(timestamp, domain); + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + @Nullable + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } + + private class OnElementContextImpl extends Trigger.OnElementContext { + + private final W window; + private final StateAccessorImpl state; + private final Timers timers; + private final TriggerInfoImpl triggerInfo; + private final Instant eventTimestamp; + + private OnElementContextImpl( + W window, + Timers timers, + ExecutableTrigger trigger, + FinishedTriggers finishedSet, + Instant eventTimestamp) { + trigger.getSpec().super(); + this.window = window; + this.state = new StateAccessorImpl(window, trigger); + this.timers = new TriggerTimers(window, timers); + this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this); + this.eventTimestamp = eventTimestamp; + } + + + @Override + public Instant eventTimestamp() { + return eventTimestamp; + } + + @Override + public Trigger.OnElementContext forTrigger(ExecutableTrigger trigger) { + return new OnElementContextImpl( + window, timers, trigger, triggerInfo.finishedSet, eventTimestamp); + } + + @Override + public TriggerInfo trigger() { + return triggerInfo; + } + + @Override + public StateAccessor<?> state() { + return state; + } + + @Override + public W window() { + return window; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain domain) { + timers.setTimer(timestamp, domain); + } + + + @Override + public void deleteTimer(Instant timestamp, TimeDomain domain) { + timers.deleteTimer(timestamp, domain); + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + @Nullable + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } + + private class OnMergeContextImpl extends Trigger.OnMergeContext { + private final MergingStateAccessor<?, W> state; + private final W window; + private final Collection<W> mergingWindows; + private final Timers timers; + private final MergingTriggerInfoImpl triggerInfo; + + private OnMergeContextImpl( + W window, + Timers timers, + ExecutableTrigger trigger, + FinishedTriggers finishedSet, + Map<W, FinishedTriggers> finishedSets) { + trigger.getSpec().super(); + this.mergingWindows = finishedSets.keySet(); + this.window = window; + this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window); + this.timers = new TriggerTimers(window, timers); + this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets); + } + + @Override + public Trigger.OnMergeContext forTrigger(ExecutableTrigger trigger) { + return new OnMergeContextImpl( + window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets); + } + + @Override + public MergingStateAccessor<?, W> state() { + return state; + } + + @Override + public MergingTriggerInfo trigger() { + return triggerInfo; + } + + @Override + public W window() { + return window; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain domain) { + timers.setTimer(timestamp, domain); + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain domain) { + timers.setTimer(timestamp, domain); + + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + @Nullable + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java new file mode 100644 index 0000000..b591229 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link AfterAll}. + */ +@RunWith(JUnit4.class) +public class AfterAllTest { + + private SimpleTriggerTester<IntervalWindow> tester; + + @Test + public void testT1FiresFirst() throws Exception { + tester = TriggerTester.forTrigger( + AfterAll.of( + AfterPane.elementCountAtLeast(1), + AfterPane.elementCountAtLeast(2)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testT2FiresFirst() throws Exception { + tester = TriggerTester.forTrigger( + AfterAll.of( + AfterPane.elementCountAtLeast(2), + AfterPane.elementCountAtLeast(1)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + /** + * Tests that the AfterAll properly unsets finished bits when a merge causing it to become + * unfinished. + */ + @Test + public void testOnMergeRewinds() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + AfterAll.of( + AfterWatermark.pastEndOfWindow(), + AfterPane.elementCountAtLeast(1)), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + + // Finish the AfterAll in the first window + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + + // Merge them; the AfterAll should not be finished + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertFalse(tester.isMarkedFinished(mergedWindow)); + + // Confirm that we are back on the first trigger by probing that it is not ready to fire + // after an element (with merging) + tester.injectElements(3); + tester.mergeWindows(); + assertFalse(tester.shouldFire(mergedWindow)); + + // Fire the AfterAll in the merged window + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + + // Confirm that we are on the second trigger by probing + tester.injectElements(2); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + tester.injectElements(2); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + } + + @Test + public void testFireDeadline() throws Exception { + BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + AfterAll.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(1)) + .getWatermarkThatGuaranteesFiring(window)); + } + + @Test + public void testContinuation() throws Exception { + OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); + Trigger afterAll = AfterAll.of(trigger1, trigger2); + assertEquals( + AfterAll.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()), + afterAll.getContinuationTrigger()); + } + + @Test + public void testToString() { + Trigger trigger = AfterAll.of(StubTrigger.named("t1"), StubTrigger.named("t2")); + assertEquals("AfterAll.of(t1, t2)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java new file mode 100644 index 0000000..c413c6e --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link AfterEach}. + */ +@RunWith(JUnit4.class) +public class AfterEachTest { + + private SimpleTriggerTester<IntervalWindow> tester; + + @Before + public void initMocks() { + MockitoAnnotations.initMocks(this); + } + + /** + * Tests that the {@link AfterEach} trigger fires and finishes the first trigger then the second. + */ + @Test + public void testAfterEachInSequence() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + Repeatedly.forever(AfterPane.elementCountAtLeast(2)) + .orFinally(AfterPane.elementCountAtLeast(3)), + Repeatedly.forever(AfterPane.elementCountAtLeast(5)) + .orFinally(AfterWatermark.pastEndOfWindow())), + FixedWindows.of(Duration.millis(10))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + // AfterCount(2) not ready + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + // AfterCount(2) ready, not finished + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // orFinally(AfterCount(3)) ready and will finish the first + tester.injectElements(1, 2, 3); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // Now running as the second trigger + assertFalse(tester.shouldFire(window)); + // This quantity of elements would fire and finish if it were erroneously still the first + tester.injectElements(1, 2, 3, 4); + assertFalse(tester.shouldFire(window)); + + // Now fire once + tester.injectElements(5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // This time advance the watermark to finish the whole mess. + tester.advanceInputWatermark(new Instant(10)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testFireDeadline() throws Exception { + BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + assertEquals(new Instant(9), + AfterEach.inOrder(AfterWatermark.pastEndOfWindow(), + AfterPane.elementCountAtLeast(4)) + .getWatermarkThatGuaranteesFiring(window)); + + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + AfterEach.inOrder(AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()) + .getWatermarkThatGuaranteesFiring(window)); + } + + @Test + public void testContinuation() throws Exception { + OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); + Trigger afterEach = AfterEach.inOrder(trigger1, trigger2); + assertEquals( + Repeatedly.forever(AfterFirst.of( + trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger())), + afterEach.getContinuationTrigger()); + } + + @Test + public void testToString() { + Trigger trigger = AfterEach.inOrder( + StubTrigger.named("t1"), + StubTrigger.named("t2"), + StubTrigger.named("t3")); + + assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java new file mode 100644 index 0000000..415060b --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link AfterFirst}. + */ +@RunWith(JUnit4.class) +public class AfterFirstTest { + + @Mock private OnceTrigger mockTrigger1; + @Mock private OnceTrigger mockTrigger2; + private SimpleTriggerTester<IntervalWindow> tester; + private static Trigger.TriggerContext anyTriggerContext() { + return Mockito.<Trigger.TriggerContext>any(); + } + + @Before + public void initMocks() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testNeitherShouldFireFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false); + + assertFalse(tester.shouldFire(window)); // should not fire + assertFalse(tester.isMarkedFinished(window)); // not finished + } + + @Test + public void testOnlyT1ShouldFireFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false); + + assertTrue(tester.shouldFire(window)); // should fire + + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testOnlyT2ShouldFireFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(window)); // should fire + + tester.fireIfShouldFire(window); // now finished + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testBothShouldFireFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(window)); // should fire + + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + /** + * Tests that if the first trigger rewinds to be non-finished in the merged window, + * then it becomes the currently active trigger again, with real triggers. + */ + @Test + public void testShouldFireAfterMerge() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + AfterFirst.of(AfterPane.elementCountAtLeast(5), + AfterWatermark.pastEndOfWindow()), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + // Finished the AfterFirst in the first window + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Set up second window where it is not done + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + assertFalse(tester.shouldFire(secondWindow)); + + // Merge them, if the merged window were on the second trigger, it would be ready + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertFalse(tester.shouldFire(mergedWindow)); + + // Now adding 3 more makes the AfterFirst ready to fire + tester.injectElements(1, 2, 3, 4, 5); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testFireDeadline() throws Exception { + BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + assertEquals(new Instant(9), + AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(4)) + .getWatermarkThatGuaranteesFiring(window)); + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + AfterFirst.of(AfterPane.elementCountAtLeast(2), AfterPane.elementCountAtLeast(1)) + .getWatermarkThatGuaranteesFiring(window)); + } + + @Test + public void testContinuation() throws Exception { + OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); + Trigger afterFirst = AfterFirst.of(trigger1, trigger2); + assertEquals( + AfterFirst.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()), + afterFirst.getContinuationTrigger()); + } + + @Test + public void testToString() { + Trigger trigger = AfterFirst.of(StubTrigger.named("t1"), StubTrigger.named("t2")); + assertEquals("AfterFirst.of(t1, t2)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java new file mode 100644 index 0000000..38d030e --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link AfterPane}. + */ +@RunWith(JUnit4.class) +public class AfterPaneTest { + + SimpleTriggerTester<IntervalWindow> tester; + /** + * Tests that the trigger does fire when enough elements are in a window, and that it only + * fires that window (no leakage). + */ + @Test + public void testAfterPaneElementCountFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + AfterPane.elementCountAtLeast(2), + FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1); // [0, 10) + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2); // [0, 10) + tester.injectElements(11); // [10, 20) + + assertTrue(tester.shouldFire(window)); // ready to fire + tester.fireIfShouldFire(window); // and finished + assertTrue(tester.isMarkedFinished(window)); + + // But don't finish the other window + assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20)))); + } + + @Test + public void testClear() throws Exception { + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterPane.elementCountAtLeast(2), + FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1, 2, 3); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + tester.clearState(window); + tester.assertCleared(window); + } + + @Test + public void testAfterPaneElementCountSessions() throws Exception { + tester = TriggerTester.forTrigger( + AfterPane.elementCountAtLeast(2), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements( + 1, // in [1, 11) + 2); // in [2, 12) + + assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new Instant(11)))); + assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new Instant(12)))); + + tester.mergeWindows(); + + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12)); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + assertTrue(tester.isMarkedFinished(mergedWindow)); + + // Because we closed the previous window, we don't have it around to merge with. So there + // will be a new FIRE_AND_FINISH result. + tester.injectElements( + 7, // in [7, 17) + 9); // in [9, 19) + + tester.mergeWindows(); + + IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new Instant(19)); + assertTrue(tester.shouldFire(newMergedWindow)); + tester.fireIfShouldFire(newMergedWindow); + assertTrue(tester.isMarkedFinished(newMergedWindow)); + } + + @Test + public void testFireDeadline() throws Exception { + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + AfterPane.elementCountAtLeast(1).getWatermarkThatGuaranteesFiring( + new IntervalWindow(new Instant(0), new Instant(10)))); + } + + @Test + public void testContinuation() throws Exception { + assertEquals( + AfterPane.elementCountAtLeast(1), + AfterPane.elementCountAtLeast(100).getContinuationTrigger()); + assertEquals( + AfterPane.elementCountAtLeast(1), + AfterPane.elementCountAtLeast(100).getContinuationTrigger().getContinuationTrigger()); + } + + @Test + public void testToString() { + Trigger trigger = AfterPane.elementCountAtLeast(5); + assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java new file mode 100644 index 0000000..13a7acf --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests the {@link AfterProcessingTime}. + */ +@RunWith(JUnit4.class) +public class AfterProcessingTimeTest { + + /** + * Tests the basic property that the trigger does wait for processing time to be + * far enough advanced. + */ + @Test + public void testAfterProcessingTimeFixedWindows() throws Exception { + Duration windowDuration = Duration.millis(10); + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + FixedWindows.of(windowDuration)); + + tester.advanceProcessingTime(new Instant(10)); + + // Timer at 15 + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); + tester.advanceProcessingTime(new Instant(12)); + assertFalse(tester.shouldFire(firstWindow)); + + // Load up elements in the next window, timer at 17 for them + tester.injectElements(11, 12, 13); + IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); + assertFalse(tester.shouldFire(secondWindow)); + + // Not quite time to fire + tester.advanceProcessingTime(new Instant(14)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first + tester.injectElements(2, 3); + + // Advance past the first timer and fire, finishing the first window + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.isMarkedFinished(firstWindow)); + + // The next window fires and finishes now + tester.advanceProcessingTime(new Instant(18)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(secondWindow); + assertTrue(tester.isMarkedFinished(secondWindow)); + } + + /** + * Tests that when windows merge, if the trigger is waiting for "N millis after the first + * element" that it is relative to the earlier of the two merged windows. + */ + @Test + public void testClear() throws Exception { + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1, 2, 3); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + tester.clearState(window); + tester.assertCleared(window); + } + + @Test + public void testAfterProcessingTimeWithMergingWindow() throws Exception { + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.advanceProcessingTime(new Instant(10)); + tester.injectElements(1); // in [1, 11), timer for 15 + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + + tester.advanceProcessingTime(new Instant(12)); + tester.injectElements(3); // in [3, 13), timer for 17 + IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13)); + assertFalse(tester.shouldFire(secondWindow)); + + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13)); + + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testFireDeadline() throws Exception { + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + AfterProcessingTime.pastFirstElementInPane().getWatermarkThatGuaranteesFiring( + new IntervalWindow(new Instant(0), new Instant(10)))); + } + + @Test + public void testContinuation() throws Exception { + OnceTrigger firstElementPlus1 = + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)); + assertEquals( + new AfterSynchronizedProcessingTime(), + firstElementPlus1.getContinuationTrigger()); + } + + /** + * Basic test of compatibility check between identical triggers. + */ + @Test + public void testCompatibilityIdentical() throws Exception { + Trigger t1 = AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(1L)); + Trigger t2 = AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(1L)); + assertTrue(t1.isCompatible(t2)); + } + + @Test + public void testToString() { + Trigger trigger = AfterProcessingTime.pastFirstElementInPane(); + assertEquals("AfterProcessingTime.pastFirstElementInPane()", trigger.toString()); + } + + @Test + public void testWithDelayToString() { + Trigger trigger = AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(5)); + + assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)", + trigger.toString()); + } + + @Test + public void testBuiltUpToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow() + .withLateFirings(AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(10))); + + String expected = "AfterWatermark.pastEndOfWindow()" + + ".withLateFirings(AfterProcessingTime" + + ".pastFirstElementInPane()" + + ".plusDelayOf(10 minutes))"; + + assertEquals(expected, trigger.toString()); + } +}