http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java new file mode 100644 index 0000000..a9feb73 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.common.collect.Sets; +import java.util.Set; + +/** + * An implementation of {@link FinishedTriggers} atop a user-provided mutable {@link Set}. + */ +public class FinishedTriggersSet implements FinishedTriggers { + + private final Set<ExecutableTrigger> finishedTriggers; + + private FinishedTriggersSet(Set<ExecutableTrigger> finishedTriggers) { + this.finishedTriggers = finishedTriggers; + } + + public static FinishedTriggersSet fromSet(Set<ExecutableTrigger> finishedTriggers) { + return new FinishedTriggersSet(finishedTriggers); + } + + /** + * Returns a mutable {@link Set} of the underlying triggers that are finished. + */ + public Set<ExecutableTrigger> getFinishedTriggers() { + return finishedTriggers; + } + + @Override + public boolean isFinished(ExecutableTrigger trigger) { + return finishedTriggers.contains(trigger); + } + + @Override + public void setFinished(ExecutableTrigger trigger, boolean value) { + if (value) { + finishedTriggers.add(trigger); + } else { + finishedTriggers.remove(trigger); + } + } + + @Override + public void clearRecursively(ExecutableTrigger trigger) { + finishedTriggers.remove(trigger); + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + clearRecursively(subTrigger); + } + } + + @Override + public FinishedTriggersSet copy() { + return fromSet(Sets.newHashSet(finishedTriggers)); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java new file mode 100644 index 0000000..5f20465 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import java.util.List; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.joda.time.Instant; + +/** + * A trigger which never fires. + * + * <p>Using this trigger will only produce output when the watermark passes the end of the + * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed + * lateness}. + */ +public final class Never { + /** + * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey} + * when the {@link BoundedWindow} closes. + */ + public static OnceTrigger ever() { + // NeverTrigger ignores all inputs and is Window-type independent. + return new NeverTrigger(); + } + + // package-private in order to check identity for string formatting. + static class NeverTrigger extends OnceTrigger { + protected NeverTrigger() { + super(null); + } + + @Override + public void onElement(OnElementContext c) {} + + @Override + public void onMerge(OnMergeContext c) {} + + @Override + protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + return this; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) { + return false; + } + + @Override + protected void onOnlyFiring(Trigger.TriggerContext context) { + throw new UnsupportedOperationException( + String.format("%s should never fire", getClass().getSimpleName())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java new file mode 100644 index 0000000..25b7b34 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.joda.time.Instant; + +/** + * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires. + */ +class OrFinallyTrigger extends Trigger { + + private static final int ACTUAL = 0; + private static final int UNTIL = 1; + + @VisibleForTesting OrFinallyTrigger(Trigger actual, Trigger.OnceTrigger until) { + super(Arrays.asList(actual, until)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + c.trigger().subTrigger(ACTUAL).invokeOnElement(c); + c.trigger().subTrigger(UNTIL).invokeOnElement(c); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + subTrigger.invokeOnMerge(c); + } + updateFinishedState(c); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + // This trigger fires once either the trigger or the until trigger fires. + Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window); + Instant untilDeadline = subTriggers.get(UNTIL).getWatermarkThatGuaranteesFiring(window); + return actualDeadline.isBefore(untilDeadline) ? actualDeadline : untilDeadline; + } + + @Override + public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL + // may not be a OnceTrigger. + return Repeatedly.forever( + new OrFinallyTrigger( + continuationTriggers.get(ACTUAL), + (Trigger.OnceTrigger) continuationTriggers.get(UNTIL))); + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context) + || context.trigger().subTrigger(UNTIL).invokeShouldFire(context); + } + + @Override + public void onFire(Trigger.TriggerContext context) throws Exception { + ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL); + ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL); + + if (untilSubtrigger.invokeShouldFire(context)) { + untilSubtrigger.invokeOnFire(context); + actualSubtrigger.invokeClear(context); + } else { + // If until didn't fire, then the actual must have (or it is forbidden to call + // onFire) so we are done only if actual is done. + actualSubtrigger.invokeOnFire(context); + // Do not clear the until trigger, because it tracks data cross firings. + } + updateFinishedState(context); + } + + @Override + public String toString() { + return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL)); + } + + private void updateFinishedState(TriggerContext c) throws Exception { + boolean anyStillFinished = false; + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished(); + } + c.trigger().setFinished(anyStillFinished); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java new file mode 100644 index 0000000..8858798 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.joda.time.Instant; + +/** + * Repeat a trigger, either until some condition is met or forever. + * + * <p>For example, to fire after the end of the window, and every time late data arrives: + * <pre> {@code + * Repeatedly.forever(AfterWatermark.isPastEndOfWindow()); + * } </pre> + * + * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite + * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}. + */ +public class Repeatedly extends Trigger { + + private static final int REPEATED = 0; + + /** + * Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each + * time it fires and ignoring any indications to finish. + * + * <p>Unless used with {@link Trigger#orFinally} the composite trigger will never finish. + * + * @param repeated the trigger to execute repeatedly. + */ + public static Repeatedly forever(Trigger repeated) { + return new Repeatedly(repeated); + } + + private Repeatedly(Trigger repeated) { + super(Arrays.asList(repeated)); + } + + + @Override + public void onElement(OnElementContext c) throws Exception { + getRepeated(c).invokeOnElement(c); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + getRepeated(c).invokeOnMerge(c); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + // This trigger fires once the repeated trigger fires. + return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window); + } + + @Override + public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + return new Repeatedly(continuationTriggers.get(REPEATED)); + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return getRepeated(context).invokeShouldFire(context); + } + + @Override + public void onFire(TriggerContext context) throws Exception { + getRepeated(context).invokeOnFire(context); + + if (context.trigger().isFinished(REPEATED)) { + // Reset tree will recursively clear the finished bits, and invoke clear. + context.forTrigger(getRepeated(context)).trigger().resetTree(); + } + } + + @Override + public String toString() { + return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED)); + } + + private ExecutableTrigger getRepeated(TriggerContext context) { + return context.trigger().subTrigger(REPEATED); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java new file mode 100644 index 0000000..9e2c27d --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.joda.time.Instant; + +/** + * The trigger used with {@link Reshuffle} which triggers on every element + * and never buffers state. + * + * @param <W> The kind of window that is being reshuffled. + */ +public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger { + + public ReshuffleTrigger() { + super(null); + } + + @Override + public void onElement(Trigger.OnElementContext c) { } + + @Override + public void onMerge(Trigger.OnMergeContext c) { } + + @Override + protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + return this; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + throw new UnsupportedOperationException( + "ReshuffleTrigger should not be used outside of Reshuffle"); + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return true; + } + + @Override + public void onFire(Trigger.TriggerContext context) throws Exception { } + + @Override + public String toString() { + return "ReshuffleTrigger()"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java new file mode 100644 index 0000000..a960aa4 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java @@ -0,0 +1,527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.common.base.Joiner; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.joda.time.Instant; + +/** + * {@code Trigger}s control when the elements for a specific key and window are output. As elements + * arrive, they are put into one or more windows by a {@link Window} transform and its associated + * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the + * {@code Window}s contents should be output. + * + * <p>See {@link org.apache.beam.sdk.transforms.GroupByKey} and {@link Window} + * for more information about how grouping with windows works. + * + * <p>The elements that are assigned to a window since the last time it was fired (or since the + * window was created) are placed into the current window pane. Triggers are evaluated against the + * elements as they are added. When the root trigger fires, the elements in the current pane will be + * output. When the root trigger finishes (indicating it will never fire again), the window is + * closed and any new elements assigned to that window are discarded. + * + * <p>Several predefined {@code Trigger}s are provided: + * <ul> + * <li> {@link AfterWatermark} for firing when the watermark passes a timestamp determined from + * either the end of the window or the arrival of the first element in a pane. + * <li> {@link AfterProcessingTime} for firing after some amount of processing time has elapsed + * (typically since the first element in a pane). + * <li> {@link AfterPane} for firing off a property of the elements in the current pane, such as + * the number of elements that have been assigned to the current pane. + * </ul> + * + * <p>In addition, {@code Trigger}s can be combined in a variety of ways: + * <ul> + * <li> {@link Repeatedly#forever} to create a trigger that executes forever. Any time its + * argument finishes it gets reset and starts over. Can be combined with + * {@link Trigger#orFinally} to specify a condition that causes the repetition to stop. + * <li> {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every) + * time that a trigger fires, and advancing to the next trigger in the sequence when it finishes. + * <li> {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments + * fires. An {@link AfterFirst} trigger finishes after it fires once. + * <li> {@link AfterAll#of} to create a trigger that fires after all least one of its arguments + * have fired at least once. An {@link AfterAll} trigger finishes after it fires once. + * </ul> + * + * <p>Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one + * of the following states: + * <ul> + * <li> Never Existed - before the trigger has started executing, there is no state associated + * with it anywhere in the system. A trigger moves to the executing state as soon as it + * processes in the current pane. + * <li> Executing - while the trigger is receiving items and may fire. While it is in this state, + * it may persist book-keeping information to persisted state, set timers, etc. + * <li> Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the + * system remembers only that it is finished. Entering this state causes us to discard any + * elements in the buffer for that window, as well. + * </ul> + * + * <p>Once finished, a trigger cannot return itself back to an earlier state, however a composite + * trigger could reset its sub-triggers. + * + * <p>Triggers should not build up any state internally since they may be recreated + * between invocations of the callbacks. All important values should be persisted using + * state before the callback returns. + */ +@Experimental(Experimental.Kind.TRIGGER) +public abstract class Trigger implements Serializable { + + /** + * Interface for accessing information about the trigger being executed and other triggers in the + * same tree. + */ + public interface TriggerInfo { + + /** + * Returns true if the windowing strategy of the current {@code PCollection} is a merging + * WindowFn. If true, the trigger execution needs to keep enough information to support the + * possibility of {@link Trigger#onMerge} being called. If false, {@link Trigger#onMerge} will + * never be called. + */ + boolean isMerging(); + + /** + * Access the executable versions of the sub-triggers of the current trigger. + */ + Iterable<ExecutableTrigger> subTriggers(); + + /** + * Access the executable version of the specified sub-trigger. + */ + ExecutableTrigger subTrigger(int subtriggerIndex); + + /** + * Returns true if the current trigger is marked finished. + */ + boolean isFinished(); + + /** + * Return true if the given subtrigger is marked finished. + */ + boolean isFinished(int subtriggerIndex); + + /** + * Returns true if all the sub-triggers of the current trigger are marked finished. + */ + boolean areAllSubtriggersFinished(); + + /** + * Returns an iterable over the unfinished sub-triggers of the current trigger. + */ + Iterable<ExecutableTrigger> unfinishedSubTriggers(); + + /** + * Returns the first unfinished sub-trigger. + */ + ExecutableTrigger firstUnfinishedSubTrigger(); + + /** + * Clears all keyed state for triggers in the current sub-tree and unsets all the associated + * finished bits. + */ + void resetTree() throws Exception; + + /** + * Sets the finished bit for the current trigger. + */ + void setFinished(boolean finished); + + /** + * Sets the finished bit for the given sub-trigger. + */ + void setFinished(boolean finished, int subTriggerIndex); + } + + /** + * Interact with properties of the trigger being executed, with extensions to deal with the + * merging windows. + */ + public interface MergingTriggerInfo extends TriggerInfo { + + /** Return true if the trigger is finished in any window being merged. */ + public abstract boolean finishedInAnyMergingWindow(); + + /** Return true if the trigger is finished in all windows being merged. */ + public abstract boolean finishedInAllMergingWindows(); + } + + /** + * Information accessible to all operational hooks in this {@code Trigger}. + * + * <p>Used directly in {@link Trigger#shouldFire} and {@link Trigger#clear}, and + * extended with additional information in other methods. + */ + public abstract class TriggerContext { + + /** Returns the interface for accessing trigger info. */ + public abstract TriggerInfo trigger(); + + /** Returns the interface for accessing persistent state. */ + public abstract StateAccessor<?> state(); + + /** The window that the current context is executing in. */ + public abstract BoundedWindow window(); + + /** Create a sub-context for the given sub-trigger. */ + public abstract TriggerContext forTrigger(ExecutableTrigger trigger); + + /** + * Removes the timer set in this trigger context for the given {@link Instant} + * and {@link TimeDomain}. + */ + public abstract void deleteTimer(Instant timestamp, TimeDomain domain); + + /** The current processing time. */ + public abstract Instant currentProcessingTime(); + + /** The current synchronized upstream processing time or {@code null} if unknown. */ + @Nullable + public abstract Instant currentSynchronizedProcessingTime(); + + /** The current event time for the input or {@code null} if unknown. */ + @Nullable + public abstract Instant currentEventTime(); + } + + /** + * Extended {@link TriggerContext} containing information accessible to the {@link #onElement} + * operational hook. + */ + public abstract class OnElementContext extends TriggerContext { + /** The event timestamp of the element currently being processed. */ + public abstract Instant eventTimestamp(); + + /** + * Sets a timer to fire when the watermark or processing time is beyond the given timestamp. + * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards. + * + * <p>As with {@link #state}, timers are implicitly scoped to the current window. All + * timer firings for a window will be received, but the implementation should choose to ignore + * those that are not applicable. + * + * @param timestamp the time at which the trigger should be re-evaluated + * @param domain the domain that the {@code timestamp} applies to + */ + public abstract void setTimer(Instant timestamp, TimeDomain domain); + + /** Create an {@code OnElementContext} for executing the given trigger. */ + @Override + public abstract OnElementContext forTrigger(ExecutableTrigger trigger); + } + + /** + * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge} + * operational hook. + */ + public abstract class OnMergeContext extends TriggerContext { + /** + * Sets a timer to fire when the watermark or processing time is beyond the given timestamp. + * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards. + * + * <p>As with {@link #state}, timers are implicitly scoped to the current window. All + * timer firings for a window will be received, but the implementation should choose to ignore + * those that are not applicable. + * + * @param timestamp the time at which the trigger should be re-evaluated + * @param domain the domain that the {@code timestamp} applies to + */ + public abstract void setTimer(Instant timestamp, TimeDomain domain); + + /** Create an {@code OnMergeContext} for executing the given trigger. */ + @Override + public abstract OnMergeContext forTrigger(ExecutableTrigger trigger); + + @Override + public abstract MergingStateAccessor<?, ?> state(); + + @Override + public abstract MergingTriggerInfo trigger(); + } + + @Nullable + protected final List<Trigger> subTriggers; + + protected Trigger(@Nullable List<Trigger> subTriggers) { + this.subTriggers = subTriggers; + } + + + /** + * Called every time an element is incorporated into a window. + */ + public abstract void onElement(OnElementContext c) throws Exception; + + /** + * Called immediately after windows have been merged. + * + * <p>Leaf triggers should update their state by inspecting their status and any state + * in the merging windows. Composite triggers should update their state by calling + * {@link ExecutableTrigger#invokeOnMerge} on their sub-triggers, and applying appropriate logic. + * + * <p>A trigger such as {@link AfterWatermark#pastEndOfWindow} may no longer be finished; + * it is the responsibility of the trigger itself to record this fact. It is forbidden for + * a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending + * elements that led to it being ready to fire. + * + * <p>The implementation does not need to clear out any state associated with the old windows. + */ + public abstract void onMerge(OnMergeContext c) throws Exception; + + /** + * Returns {@code true} if the current state of the trigger indicates that its condition + * is satisfied and it is ready to fire. + */ + public abstract boolean shouldFire(TriggerContext context) throws Exception; + + /** + * Adjusts the state of the trigger to be ready for the next pane. For example, a + * {@link Repeatedly} trigger will reset its inner trigger, since it has fired. + * + * <p>If the trigger is finished, it is the responsibility of the trigger itself to + * record that fact via the {@code context}. + */ + public abstract void onFire(TriggerContext context) throws Exception; + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #onElement} call. + */ + public void prefetchOnElement(StateAccessor<?> state) { + if (subTriggers != null) { + for (Trigger trigger : subTriggers) { + trigger.prefetchOnElement(state); + } + } + } + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #onMerge} call. + */ + public void prefetchOnMerge(MergingStateAccessor<?, ?> state) { + if (subTriggers != null) { + for (Trigger trigger : subTriggers) { + trigger.prefetchOnMerge(state); + } + } + } + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #shouldFire} call. + */ + public void prefetchShouldFire(StateAccessor<?> state) { + if (subTriggers != null) { + for (Trigger trigger : subTriggers) { + trigger.prefetchShouldFire(state); + } + } + } + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #onFire} call. + */ + public void prefetchOnFire(StateAccessor<?> state) { + if (subTriggers != null) { + for (Trigger trigger : subTriggers) { + trigger.prefetchOnFire(state); + } + } + } + + /** + * Clear any state associated with this trigger in the given window. + * + * <p>This is called after a trigger has indicated it will never fire again. The trigger system + * keeps enough information to know that the trigger is finished, so this trigger should clear all + * of its state. + */ + public void clear(TriggerContext c) throws Exception { + if (subTriggers != null) { + for (ExecutableTrigger trigger : c.trigger().subTriggers()) { + trigger.invokeClear(c); + } + } + } + + public Iterable<Trigger> subTriggers() { + return subTriggers; + } + + /** + * Return a trigger to use after a {@code GroupByKey} to preserve the + * intention of this trigger. Specifically, triggers that are time based + * and intended to provide speculative results should continue providing + * speculative results. Triggers that fire once (or multiple times) should + * continue firing once (or multiple times). + */ + public Trigger getContinuationTrigger() { + if (subTriggers == null) { + return getContinuationTrigger(null); + } + + List<Trigger> subTriggerContinuations = new ArrayList<>(); + for (Trigger subTrigger : subTriggers) { + subTriggerContinuations.add(subTrigger.getContinuationTrigger()); + } + return getContinuationTrigger(subTriggerContinuations); + } + + /** + * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For convenience, this + * is provided the continuation trigger of each of the sub-triggers. + */ + protected abstract Trigger getContinuationTrigger(List<Trigger> continuationTriggers); + + /** + * Returns a bound in watermark time by which this trigger would have fired at least once + * for a given window had there been input data. This is a static property of a trigger + * that does not depend on its state. + * + * <p>For triggers that do not fire based on the watermark advancing, returns + * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + * + * <p>This estimate is used to determine that there are no elements in a side-input window, which + * causes the default value to be used instead. + */ + public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window); + + /** + * Returns whether this performs the same triggering as the given {@code Trigger}. + */ + public boolean isCompatible(Trigger other) { + if (!getClass().equals(other.getClass())) { + return false; + } + + if (subTriggers == null) { + return other.subTriggers == null; + } else if (other.subTriggers == null) { + return false; + } else if (subTriggers.size() != other.subTriggers.size()) { + return false; + } + + for (int i = 0; i < subTriggers.size(); i++) { + if (!subTriggers.get(i).isCompatible(other.subTriggers.get(i))) { + return false; + } + } + + return true; + } + + @Override + public String toString() { + String simpleName = getClass().getSimpleName(); + if (getClass().getEnclosingClass() != null) { + simpleName = getClass().getEnclosingClass().getSimpleName() + "." + simpleName; + } + if (subTriggers == null || subTriggers.size() == 0) { + return simpleName; + } else { + return simpleName + "(" + Joiner.on(", ").join(subTriggers) + ")"; + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof Trigger)) { + return false; + } + Trigger that = (Trigger) obj; + return Objects.equals(getClass(), that.getClass()) + && Objects.equals(subTriggers, that.subTriggers); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), subTriggers); + } + + /** + * Specify an ending condition for this trigger. If the {@code until} fires then the combination + * fires. + * + * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes + * as soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time + * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that + * {@code t1} may have fired since {@code t2} started, so not all of the elements that {@code t2} + * has seen are necessarily in the current pane. + * + * <p>For example the final firing of the following trigger may only have 1 element: + * <pre> {@code + * Repeatedly.forever(AfterPane.elementCountAtLeast(2)) + * .orFinally(AfterPane.elementCountAtLeast(5)) + * } </pre> + * + * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same + * as {@code AfterFirst.of(t1, t2)}. + */ + public Trigger orFinally(OnceTrigger until) { + return new OrFinallyTrigger(this, until); + } + + /** + * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather + * than the general {@link Trigger} class to indicate that behavior. + */ + public abstract static class OnceTrigger extends Trigger { + protected OnceTrigger(List<Trigger> subTriggers) { + super(subTriggers); + } + + @Override + public final OnceTrigger getContinuationTrigger() { + Trigger continuation = super.getContinuationTrigger(); + if (!(continuation instanceof OnceTrigger)) { + throw new IllegalStateException("Continuation of a OnceTrigger must be a OnceTrigger"); + } + return (OnceTrigger) continuation; + } + + /** + * {@inheritDoc} + */ + @Override + public final void onFire(TriggerContext context) throws Exception { + onOnlyFiring(context); + context.trigger().setFinished(true); + } + + /** + * Called exactly once by {@link #onFire} when the trigger is fired. By default, + * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}. + */ + protected abstract void onOnlyFiring(TriggerContext context) throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java new file mode 100644 index 0000000..e09aac2 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import java.util.Collection; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Trigger.MergingTriggerInfo; +import org.apache.beam.sdk.transforms.windowing.Trigger.TriggerInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTag; +import org.joda.time.Instant; + +/** + * Factory for creating instances of the various {@link Trigger} contexts. + * + * <p>These contexts are highly interdependent and share many fields; it is inadvisable + * to create them via any means other than this factory class. + */ +public class TriggerContextFactory<W extends BoundedWindow> { + + private final WindowFn<?, W> windowFn; + private StateInternals<?> stateInternals; + private final Coder<W> windowCoder; + + public TriggerContextFactory(WindowFn<?, W> windowFn, + StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) { + // Future triggers may be able to exploit the active window to state address window mapping. + this.windowFn = windowFn; + this.stateInternals = stateInternals; + this.windowCoder = windowFn.windowCoder(); + } + + public Trigger.TriggerContext base(W window, Timers timers, + ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { + return new TriggerContextImpl(window, timers, rootTrigger, finishedSet); + } + + public Trigger.OnElementContext createOnElementContext( + W window, Timers timers, Instant elementTimestamp, + ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { + return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp); + } + + public Trigger.OnMergeContext createOnMergeContext(W window, Timers timers, + ExecutableTrigger rootTrigger, FinishedTriggers finishedSet, + Map<W, FinishedTriggers> finishedSets) { + return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets); + } + + public StateAccessor<?> createStateAccessor(W window, ExecutableTrigger trigger) { + return new StateAccessorImpl(window, trigger); + } + + public MergingStateAccessor<?, W> createMergingStateAccessor( + W mergeResult, Collection<W> mergingWindows, ExecutableTrigger trigger) { + return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult); + } + + private class TriggerInfoImpl implements Trigger.TriggerInfo { + + protected final ExecutableTrigger trigger; + protected final FinishedTriggers finishedSet; + private final Trigger.TriggerContext context; + + public TriggerInfoImpl(ExecutableTrigger trigger, FinishedTriggers finishedSet, + Trigger.TriggerContext context) { + this.trigger = trigger; + this.finishedSet = finishedSet; + this.context = context; + } + + @Override + public boolean isMerging() { + return !windowFn.isNonMerging(); + } + + @Override + public Iterable<ExecutableTrigger> subTriggers() { + return trigger.subTriggers(); + } + + @Override + public ExecutableTrigger subTrigger(int subtriggerIndex) { + return trigger.subTriggers().get(subtriggerIndex); + } + + @Override + public boolean isFinished() { + return finishedSet.isFinished(trigger); + } + + @Override + public boolean isFinished(int subtriggerIndex) { + return finishedSet.isFinished(subTrigger(subtriggerIndex)); + } + + @Override + public boolean areAllSubtriggersFinished() { + return Iterables.isEmpty(unfinishedSubTriggers()); + } + + @Override + public Iterable<ExecutableTrigger> unfinishedSubTriggers() { + return FluentIterable + .from(trigger.subTriggers()) + .filter(new Predicate<ExecutableTrigger>() { + @Override + public boolean apply(ExecutableTrigger trigger) { + return !finishedSet.isFinished(trigger); + } + }); + } + + @Override + public ExecutableTrigger firstUnfinishedSubTrigger() { + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + if (!finishedSet.isFinished(subTrigger)) { + return subTrigger; + } + } + return null; + } + + @Override + public void resetTree() throws Exception { + finishedSet.clearRecursively(trigger); + trigger.invokeClear(context); + } + + @Override + public void setFinished(boolean finished) { + finishedSet.setFinished(trigger, finished); + } + + @Override + public void setFinished(boolean finished, int subTriggerIndex) { + finishedSet.setFinished(subTrigger(subTriggerIndex), finished); + } + } + + private class TriggerTimers implements Timers { + + private final Timers timers; + private final W window; + + public TriggerTimers(W window, Timers timers) { + this.timers = timers; + this.window = window; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain timeDomain) { + timers.setTimer(timestamp, timeDomain); + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { + if (timeDomain == TimeDomain.EVENT_TIME + && timestamp.equals(window.maxTimestamp())) { + // Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time + // state transitions. + return; + } + timers.deleteTimer(timestamp, timeDomain); + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } + + private class MergingTriggerInfoImpl + extends TriggerInfoImpl implements Trigger.MergingTriggerInfo { + + private final Map<W, FinishedTriggers> finishedSets; + + public MergingTriggerInfoImpl( + ExecutableTrigger trigger, + FinishedTriggers finishedSet, + Trigger.TriggerContext context, + Map<W, FinishedTriggers> finishedSets) { + super(trigger, finishedSet, context); + this.finishedSets = finishedSets; + } + + @Override + public boolean finishedInAnyMergingWindow() { + for (FinishedTriggers finishedSet : finishedSets.values()) { + if (finishedSet.isFinished(trigger)) { + return true; + } + } + return false; + } + + @Override + public boolean finishedInAllMergingWindows() { + for (FinishedTriggers finishedSet : finishedSets.values()) { + if (!finishedSet.isFinished(trigger)) { + return false; + } + } + return true; + } + } + + private class StateAccessorImpl implements StateAccessor<Object> { + protected final int triggerIndex; + protected final StateNamespace windowNamespace; + + public StateAccessorImpl( + W window, + ExecutableTrigger trigger) { + this.triggerIndex = trigger.getTriggerIndex(); + this.windowNamespace = namespaceFor(window); + } + + protected StateNamespace namespaceFor(W window) { + return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex); + } + + @Override + public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) { + return stateInternals.state(windowNamespace, address); + } + } + + private class MergingStateAccessorImpl extends StateAccessorImpl + implements MergingStateAccessor<Object, W> { + private final Collection<W> activeToBeMerged; + + public MergingStateAccessorImpl(ExecutableTrigger trigger, Collection<W> activeToBeMerged, + W mergeResult) { + super(mergeResult, trigger); + this.activeToBeMerged = activeToBeMerged; + } + + @Override + public <StateT extends State> StateT access( + StateTag<? super Object, StateT> address) { + return stateInternals.state(windowNamespace, address); + } + + @Override + public <StateT extends State> Map<W, StateT> accessInEachMergingWindow( + StateTag<? super Object, StateT> address) { + ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder(); + for (W mergingWindow : activeToBeMerged) { + StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address); + builder.put(mergingWindow, stateForWindow); + } + return builder.build(); + } + } + + private class TriggerContextImpl extends Trigger.TriggerContext { + + private final W window; + private final StateAccessorImpl state; + private final Timers timers; + private final TriggerInfoImpl triggerInfo; + + private TriggerContextImpl( + W window, + Timers timers, + ExecutableTrigger trigger, + FinishedTriggers finishedSet) { + trigger.getSpec().super(); + this.window = window; + this.state = new StateAccessorImpl(window, trigger); + this.timers = new TriggerTimers(window, timers); + this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this); + } + + @Override + public Trigger.TriggerContext forTrigger(ExecutableTrigger trigger) { + return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet); + } + + @Override + public TriggerInfo trigger() { + return triggerInfo; + } + + @Override + public StateAccessor<?> state() { + return state; + } + + @Override + public W window() { + return window; + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain domain) { + timers.deleteTimer(timestamp, domain); + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + @Nullable + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } + + private class OnElementContextImpl extends Trigger.OnElementContext { + + private final W window; + private final StateAccessorImpl state; + private final Timers timers; + private final TriggerInfoImpl triggerInfo; + private final Instant eventTimestamp; + + private OnElementContextImpl( + W window, + Timers timers, + ExecutableTrigger trigger, + FinishedTriggers finishedSet, + Instant eventTimestamp) { + trigger.getSpec().super(); + this.window = window; + this.state = new StateAccessorImpl(window, trigger); + this.timers = new TriggerTimers(window, timers); + this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this); + this.eventTimestamp = eventTimestamp; + } + + + @Override + public Instant eventTimestamp() { + return eventTimestamp; + } + + @Override + public Trigger.OnElementContext forTrigger(ExecutableTrigger trigger) { + return new OnElementContextImpl( + window, timers, trigger, triggerInfo.finishedSet, eventTimestamp); + } + + @Override + public TriggerInfo trigger() { + return triggerInfo; + } + + @Override + public StateAccessor<?> state() { + return state; + } + + @Override + public W window() { + return window; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain domain) { + timers.setTimer(timestamp, domain); + } + + + @Override + public void deleteTimer(Instant timestamp, TimeDomain domain) { + timers.deleteTimer(timestamp, domain); + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + @Nullable + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } + + private class OnMergeContextImpl extends Trigger.OnMergeContext { + private final MergingStateAccessor<?, W> state; + private final W window; + private final Collection<W> mergingWindows; + private final Timers timers; + private final MergingTriggerInfoImpl triggerInfo; + + private OnMergeContextImpl( + W window, + Timers timers, + ExecutableTrigger trigger, + FinishedTriggers finishedSet, + Map<W, FinishedTriggers> finishedSets) { + trigger.getSpec().super(); + this.mergingWindows = finishedSets.keySet(); + this.window = window; + this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window); + this.timers = new TriggerTimers(window, timers); + this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets); + } + + @Override + public Trigger.OnMergeContext forTrigger(ExecutableTrigger trigger) { + return new OnMergeContextImpl( + window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets); + } + + @Override + public MergingStateAccessor<?, W> state() { + return state; + } + + @Override + public MergingTriggerInfo trigger() { + return triggerInfo; + } + + @Override + public W window() { + return window; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain domain) { + timers.setTimer(timestamp, domain); + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain domain) { + timers.setTimer(timestamp, domain); + + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + @Nullable + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java new file mode 100644 index 0000000..8d0f322 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.BitSet; +import java.util.Collection; +import java.util.Map; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.util.BitSetCoder; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.apache.beam.sdk.util.FinishedTriggers; +import org.apache.beam.sdk.util.FinishedTriggersBitSet; +import org.apache.beam.sdk.util.Timers; +import org.apache.beam.sdk.util.TriggerContextFactory; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; +import org.joda.time.Instant; + +/** + * Executes a trigger while managing persistence of information about which subtriggers are + * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger. + * + * <p>Specifically, the responsibilities are: + * + * <ul> + * <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by + * constructing the appropriate trigger contexts.</li> + * <li>Committing a record of which subtriggers are finished to persistent state.</li> + * <li>Restoring the record of which subtriggers are finished from persistent state.</li> + * <li>Clearing out the persisted finished set when a caller indicates + * (via {#link #clearFinished}) that it is no longer needed.</li> + * </ul> + * + * <p>These responsibilities are intertwined: trigger contexts include mutable information about + * which subtriggers are finished. This class provides the information when building the contexts + * and commits the information when the method of the {@link ExecutableTrigger} returns. + * + * @param <W> The kind of windows being processed. + */ +public class TriggerRunner<W extends BoundedWindow> { + @VisibleForTesting + static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG = + StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of())); + + private final ExecutableTrigger rootTrigger; + private final TriggerContextFactory<W> contextFactory; + + public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) { + checkState(rootTrigger.getTriggerIndex() == 0); + this.rootTrigger = rootTrigger; + this.contextFactory = contextFactory; + } + + private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) { + if (!isFinishedSetNeeded()) { + // If no trigger in the tree will ever have finished bits, then we don't need to read them. + // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not + // finished) for each trigger in the tree. + return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()); + } + + BitSet bitSet = state.read(); + return bitSet == null + ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()) + : FinishedTriggersBitSet.fromBitSet(bitSet); + } + + + private void clearFinishedBits(ValueState<BitSet> state) { + if (!isFinishedSetNeeded()) { + // Nothing to clear. + return; + } + state.clear(); + } + + /** Return true if the trigger is closed in the window corresponding to the specified state. */ + public boolean isClosed(StateAccessor<?> state) { + return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchForValue(W window, StateAccessor<?> state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchOnElement( + contextFactory.createStateAccessor(window, rootTrigger)); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchOnFire(W window, StateAccessor<?> state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger)); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchShouldFire(W window, StateAccessor<?> state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchShouldFire( + contextFactory.createStateAccessor(window, rootTrigger)); + } + + /** + * Run the trigger logic to deal with a new value. + */ + public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state) + throws Exception { + // Clone so that we can detect changes and so that changes here don't pollute merging. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext( + window, timers, timestamp, rootTrigger, finishedSet); + rootTrigger.invokeOnElement(triggerContext); + persistFinishedSet(state, finishedSet); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchForMerge( + W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) { + if (isFinishedSetNeeded()) { + for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) { + value.readLater(); + } + } + rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor( + window, mergingWindows, rootTrigger)); + } + + /** + * Run the trigger merging logic as part of executing the specified merge. + */ + public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception { + // Clone so that we can detect changes and so that changes here don't pollute merging. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + + // And read the finished bits in each merging window. + ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder(); + for (Map.Entry<W, ValueState<BitSet>> entry : + state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) { + // Don't need to clone these, since the trigger context doesn't allow modification + builder.put(entry.getKey(), readFinishedBits(entry.getValue())); + // Clear the underlying finished bits. + clearFinishedBits(entry.getValue()); + } + ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build(); + + Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext( + window, timers, rootTrigger, finishedSet, mergingFinishedSets); + + // Run the merge from the trigger + rootTrigger.invokeOnMerge(mergeContext); + + persistFinishedSet(state, finishedSet); + } + + public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception { + FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + Trigger.TriggerContext context = contextFactory.base(window, timers, + rootTrigger, finishedSet); + return rootTrigger.invokeShouldFire(context); + } + + public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception { + // shouldFire should be false. + // However it is too expensive to assert. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + Trigger.TriggerContext context = contextFactory.base(window, timers, + rootTrigger, finishedSet); + rootTrigger.invokeOnFire(context); + persistFinishedSet(state, finishedSet); + } + + private void persistFinishedSet( + StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) { + if (!isFinishedSetNeeded()) { + return; + } + + ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG); + if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) { + if (modifiedFinishedSet.getBitSet().isEmpty()) { + finishedSetState.clear(); + } else { + finishedSetState.write(modifiedFinishedSet.getBitSet()); + } + } + } + + /** + * Clear the finished bits. + */ + public void clearFinished(StateAccessor<?> state) { + clearFinishedBits(state.access(FINISHED_BITS_TAG)); + } + + /** + * Clear the state used for executing triggers, but leave the finished set to indicate + * the window is closed. + */ + public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception { + // Don't need to clone, because we'll be clearing the finished bits anyways. + FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)); + rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet)); + } + + private boolean isFinishedSetNeeded() { + // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the + // lookup. Right now, we special case this for the DefaultTrigger. + return !(rootTrigger.getSpec() instanceof DefaultTrigger); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java new file mode 100644 index 0000000..b591229 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link AfterAll}. + */ +@RunWith(JUnit4.class) +public class AfterAllTest { + + private SimpleTriggerTester<IntervalWindow> tester; + + @Test + public void testT1FiresFirst() throws Exception { + tester = TriggerTester.forTrigger( + AfterAll.of( + AfterPane.elementCountAtLeast(1), + AfterPane.elementCountAtLeast(2)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testT2FiresFirst() throws Exception { + tester = TriggerTester.forTrigger( + AfterAll.of( + AfterPane.elementCountAtLeast(2), + AfterPane.elementCountAtLeast(1)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + /** + * Tests that the AfterAll properly unsets finished bits when a merge causing it to become + * unfinished. + */ + @Test + public void testOnMergeRewinds() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + AfterAll.of( + AfterWatermark.pastEndOfWindow(), + AfterPane.elementCountAtLeast(1)), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + + // Finish the AfterAll in the first window + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + + // Merge them; the AfterAll should not be finished + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertFalse(tester.isMarkedFinished(mergedWindow)); + + // Confirm that we are back on the first trigger by probing that it is not ready to fire + // after an element (with merging) + tester.injectElements(3); + tester.mergeWindows(); + assertFalse(tester.shouldFire(mergedWindow)); + + // Fire the AfterAll in the merged window + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + + // Confirm that we are on the second trigger by probing + tester.injectElements(2); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + tester.injectElements(2); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + } + + @Test + public void testFireDeadline() throws Exception { + BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + AfterAll.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(1)) + .getWatermarkThatGuaranteesFiring(window)); + } + + @Test + public void testContinuation() throws Exception { + OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); + Trigger afterAll = AfterAll.of(trigger1, trigger2); + assertEquals( + AfterAll.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()), + afterAll.getContinuationTrigger()); + } + + @Test + public void testToString() { + Trigger trigger = AfterAll.of(StubTrigger.named("t1"), StubTrigger.named("t2")); + assertEquals("AfterAll.of(t1, t2)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java new file mode 100644 index 0000000..c413c6e --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link AfterEach}. + */ +@RunWith(JUnit4.class) +public class AfterEachTest { + + private SimpleTriggerTester<IntervalWindow> tester; + + @Before + public void initMocks() { + MockitoAnnotations.initMocks(this); + } + + /** + * Tests that the {@link AfterEach} trigger fires and finishes the first trigger then the second. + */ + @Test + public void testAfterEachInSequence() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + Repeatedly.forever(AfterPane.elementCountAtLeast(2)) + .orFinally(AfterPane.elementCountAtLeast(3)), + Repeatedly.forever(AfterPane.elementCountAtLeast(5)) + .orFinally(AfterWatermark.pastEndOfWindow())), + FixedWindows.of(Duration.millis(10))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + // AfterCount(2) not ready + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + // AfterCount(2) ready, not finished + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // orFinally(AfterCount(3)) ready and will finish the first + tester.injectElements(1, 2, 3); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // Now running as the second trigger + assertFalse(tester.shouldFire(window)); + // This quantity of elements would fire and finish if it were erroneously still the first + tester.injectElements(1, 2, 3, 4); + assertFalse(tester.shouldFire(window)); + + // Now fire once + tester.injectElements(5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // This time advance the watermark to finish the whole mess. + tester.advanceInputWatermark(new Instant(10)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testFireDeadline() throws Exception { + BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + assertEquals(new Instant(9), + AfterEach.inOrder(AfterWatermark.pastEndOfWindow(), + AfterPane.elementCountAtLeast(4)) + .getWatermarkThatGuaranteesFiring(window)); + + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + AfterEach.inOrder(AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()) + .getWatermarkThatGuaranteesFiring(window)); + } + + @Test + public void testContinuation() throws Exception { + OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); + Trigger afterEach = AfterEach.inOrder(trigger1, trigger2); + assertEquals( + Repeatedly.forever(AfterFirst.of( + trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger())), + afterEach.getContinuationTrigger()); + } + + @Test + public void testToString() { + Trigger trigger = AfterEach.inOrder( + StubTrigger.named("t1"), + StubTrigger.named("t2"), + StubTrigger.named("t3")); + + assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java new file mode 100644 index 0000000..415060b --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link AfterFirst}. + */ +@RunWith(JUnit4.class) +public class AfterFirstTest { + + @Mock private OnceTrigger mockTrigger1; + @Mock private OnceTrigger mockTrigger2; + private SimpleTriggerTester<IntervalWindow> tester; + private static Trigger.TriggerContext anyTriggerContext() { + return Mockito.<Trigger.TriggerContext>any(); + } + + @Before + public void initMocks() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testNeitherShouldFireFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false); + + assertFalse(tester.shouldFire(window)); // should not fire + assertFalse(tester.isMarkedFinished(window)); // not finished + } + + @Test + public void testOnlyT1ShouldFireFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false); + + assertTrue(tester.shouldFire(window)); // should fire + + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testOnlyT2ShouldFireFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(window)); // should fire + + tester.fireIfShouldFire(window); // now finished + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testBothShouldFireFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(window)); // should fire + + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + /** + * Tests that if the first trigger rewinds to be non-finished in the merged window, + * then it becomes the currently active trigger again, with real triggers. + */ + @Test + public void testShouldFireAfterMerge() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + AfterFirst.of(AfterPane.elementCountAtLeast(5), + AfterWatermark.pastEndOfWindow()), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + // Finished the AfterFirst in the first window + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Set up second window where it is not done + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + assertFalse(tester.shouldFire(secondWindow)); + + // Merge them, if the merged window were on the second trigger, it would be ready + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertFalse(tester.shouldFire(mergedWindow)); + + // Now adding 3 more makes the AfterFirst ready to fire + tester.injectElements(1, 2, 3, 4, 5); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testFireDeadline() throws Exception { + BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + assertEquals(new Instant(9), + AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(4)) + .getWatermarkThatGuaranteesFiring(window)); + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + AfterFirst.of(AfterPane.elementCountAtLeast(2), AfterPane.elementCountAtLeast(1)) + .getWatermarkThatGuaranteesFiring(window)); + } + + @Test + public void testContinuation() throws Exception { + OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); + Trigger afterFirst = AfterFirst.of(trigger1, trigger2); + assertEquals( + AfterFirst.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()), + afterFirst.getContinuationTrigger()); + } + + @Test + public void testToString() { + Trigger trigger = AfterFirst.of(StubTrigger.named("t1"), StubTrigger.named("t2")); + assertEquals("AfterFirst.of(t1, t2)", trigger.toString()); + } +}