http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java new file mode 100644 index 0000000..2490463 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Instant; + +/** + * {@code AfterProcessingTime} triggers fire based on the current processing time. They operate in + * the real-time domain. + * + * <p>The time at which to fire the timer can be adjusted via the methods in {@link + * AfterDelayFromFirstElementStateMachine}, such as {@link + * AfterDelayFromFirstElementStateMachine#plusDelayOf} or {@link + * AfterDelayFromFirstElementStateMachine#alignedTo}. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class AfterProcessingTimeStateMachine extends AfterDelayFromFirstElementStateMachine { + + @Override + @Nullable + public Instant getCurrentTime(TriggerStateMachine.TriggerContext context) { + return context.currentProcessingTime(); + } + + private AfterProcessingTimeStateMachine(List<SerializableFunction<Instant, Instant>> transforms) { + super(TimeDomain.PROCESSING_TIME, transforms); + } + + /** + * Creates a trigger that fires when the current processing time passes the processing time + * at which this trigger saw the first element in a pane. + */ + public static AfterProcessingTimeStateMachine pastFirstElementInPane() { + return new AfterProcessingTimeStateMachine(IDENTITY); + } + + @Override + protected AfterProcessingTimeStateMachine newWith( + List<SerializableFunction<Instant, Instant>> transforms) { + return new AfterProcessingTimeStateMachine(transforms); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()"); + for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) { + builder + .append(".plusDelayOf(") + .append(delayFn) + .append(")"); + } + + return builder.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof AfterProcessingTimeStateMachine)) { + return false; + } + AfterProcessingTimeStateMachine that = (AfterProcessingTimeStateMachine) obj; + return Objects.equals(this.timestampMappers, that.timestampMappers); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), this.timestampMappers); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java new file mode 100644 index 0000000..000f6e7 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import com.google.common.base.Objects; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Instant; + +class AfterSynchronizedProcessingTimeStateMachine extends AfterDelayFromFirstElementStateMachine { + + @Override + @Nullable + public Instant getCurrentTime(TriggerStateMachine.TriggerContext context) { + return context.currentSynchronizedProcessingTime(); + } + + public AfterSynchronizedProcessingTimeStateMachine() { + super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, + Collections.<SerializableFunction<Instant, Instant>>emptyList()); + } + + @Override + public String toString() { + return "AfterSynchronizedProcessingTime.pastFirstElementInPane()"; + } + + @Override + public boolean equals(Object obj) { + return this == obj || obj instanceof AfterSynchronizedProcessingTimeStateMachine; + } + + @Override + public int hashCode() { + return Objects.hashCode(AfterSynchronizedProcessingTimeStateMachine.class); + } + + @Override + protected AfterSynchronizedProcessingTimeStateMachine + newWith(List<SerializableFunction<Instant, Instant>> transforms) { + // ignore transforms + return this; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java new file mode 100644 index 0000000..5ad6214 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableList; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.util.TimeDomain; + +/** + * {@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a + * lower-bound, sometimes heuristically established, on event times that have been fully processed + * by the pipeline. + * + * <p>For sources that provide non-heuristic watermarks (e.g. + * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the + * watermark is a strict guarantee that no data with an event time earlier than + * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any + * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end + * of the window will be the last pane ever for that window. + * + * <p>For sources that provide heuristic watermarks (e.g. + * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the + * watermark itself becomes an <i>estimate</i> that no data with an event time earlier than that + * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can + * often be quite accurate, but the chance of seeing late data for any given window is non-zero. + * Thus, if absolute correctness over time is important to your use case, you may want to consider + * using a trigger that accounts for late data. The default trigger, + * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires + * once when the watermark passes the end of the window and then immediately therafter when any + * late data arrives, is one such example. + * + * <p>The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}. + * + * <p>Additionaly firings before or after the watermark can be requested by calling + * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or + * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class AfterWatermarkStateMachine { + + private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()"; + + // Static factory class. + private AfterWatermarkStateMachine() {} + + /** + * Creates a trigger that fires when the watermark passes the end of the window. + */ + public static FromEndOfWindow pastEndOfWindow() { + return new FromEndOfWindow(); + } + + /** + * @see AfterWatermarkStateMachine + */ + public static class AfterWatermarkEarlyAndLate extends TriggerStateMachine { + + private static final int EARLY_INDEX = 0; + private static final int LATE_INDEX = 1; + + private final OnceTriggerStateMachine earlyTrigger; + @Nullable + private final OnceTriggerStateMachine lateTrigger; + + @SuppressWarnings("unchecked") + private AfterWatermarkEarlyAndLate( + OnceTriggerStateMachine earlyTrigger, OnceTriggerStateMachine lateTrigger) { + super( + lateTrigger == null + ? ImmutableList.<TriggerStateMachine>of(earlyTrigger) + : ImmutableList.<TriggerStateMachine>of(earlyTrigger, lateTrigger)); + this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null"); + this.lateTrigger = lateTrigger; + } + + public TriggerStateMachine withEarlyFirings(OnceTriggerStateMachine earlyTrigger) { + return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); + } + + public TriggerStateMachine withLateFirings(OnceTriggerStateMachine lateTrigger) { + return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + if (!c.trigger().isMerging()) { + // If merges can never happen, we just run the unfinished subtrigger + c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c); + } else { + // If merges can happen, we run for all subtriggers because they might be + // de-activated or re-activated + for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) { + subTrigger.invokeOnElement(c); + } + } + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + // NOTE that the ReduceFnRunner will delete all end-of-window timers for the + // merged-away windows. + + ExecutableTriggerStateMachine earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX); + // We check the early trigger to determine if we are still processing it or + // if the end of window has transitioned us to the late trigger + OnMergeContext earlyContext = c.forTrigger(earlySubtrigger); + + // If the early trigger is still active in any merging window then it is still active in + // the new merged window, because even if the merged window is "done" some pending elements + // haven't had a chance to fire. + if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) { + earlyContext.trigger().setFinished(false); + if (lateTrigger != null) { + ExecutableTriggerStateMachine lateSubtrigger = c.trigger().subTrigger(LATE_INDEX); + OnMergeContext lateContext = c.forTrigger(lateSubtrigger); + lateContext.trigger().setFinished(false); + lateSubtrigger.invokeClear(lateContext); + } + } else { + // Otherwise the early trigger and end-of-window bit is done for good. + earlyContext.trigger().setFinished(true); + if (lateTrigger != null) { + c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c); + } + } + } + + private boolean endOfWindowReached(TriggerStateMachine.TriggerContext context) { + return context.currentEventTime() != null + && context.currentEventTime().isAfter(context.window().maxTimestamp()); + } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + if (!context.trigger().isFinished(EARLY_INDEX)) { + // We have not yet transitioned to late firings. + // We should fire if either the trigger is ready or we reach the end of the window. + return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context) + || endOfWindowReached(context); + } else if (lateTrigger == null) { + return false; + } else { + // We are running the late trigger + return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context); + } + } + + @Override + public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { + if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) { + onNonLateFiring(context); + } else if (lateTrigger != null) { + onLateFiring(context); + } else { + // all done + context.trigger().setFinished(true); + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(TO_STRING); + + if (!(earlyTrigger instanceof NeverStateMachine)) { + builder + .append(".withEarlyFirings(") + .append(earlyTrigger) + .append(")"); + } + + if (lateTrigger != null && !(lateTrigger instanceof NeverStateMachine)) { + builder + .append(".withLateFirings(") + .append(lateTrigger) + .append(")"); + } + + return builder.toString(); + } + + private void onNonLateFiring(TriggerStateMachine.TriggerContext context) throws Exception { + // We have not yet transitioned to late firings. + ExecutableTriggerStateMachine earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX); + TriggerStateMachine.TriggerContext earlyContext = context.forTrigger(earlySubtrigger); + + if (!endOfWindowReached(context)) { + // This is an early firing, since we have not arrived at the end of the window + // Implicitly repeats + earlySubtrigger.invokeOnFire(context); + earlySubtrigger.invokeClear(context); + earlyContext.trigger().setFinished(false); + } else { + // We have arrived at the end of the window; terminate the early trigger + // and clear out the late trigger's state + if (earlySubtrigger.invokeShouldFire(context)) { + earlySubtrigger.invokeOnFire(context); + } + earlyContext.trigger().setFinished(true); + earlySubtrigger.invokeClear(context); + + if (lateTrigger == null) { + // Done if there is no late trigger. + context.trigger().setFinished(true); + } else { + // If there is a late trigger, we transition to it, and need to clear its state + // because it was run in parallel. + context.trigger().subTrigger(LATE_INDEX).invokeClear(context); + } + } + + } + + private void onLateFiring(TriggerStateMachine.TriggerContext context) throws Exception { + // We are firing the late trigger, with implicit repeat + ExecutableTriggerStateMachine lateSubtrigger = context.trigger().subTrigger(LATE_INDEX); + lateSubtrigger.invokeOnFire(context); + // It is a OnceTrigger, so it must have finished; unfinished it and clear it + lateSubtrigger.invokeClear(context); + context.forTrigger(lateSubtrigger).trigger().setFinished(false); + } + } + + /** + * A watermark trigger targeted relative to the end of the window. + */ + public static class FromEndOfWindow extends OnceTriggerStateMachine { + + private FromEndOfWindow() { + super(null); + } + + /** + * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever + * the given {@code Trigger} fires before the watermark has passed the end of the window. + */ + public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTriggerStateMachine earlyFirings) { + checkNotNull(earlyFirings, "Must specify the trigger to use for early firings"); + return new AfterWatermarkEarlyAndLate(earlyFirings, null); + } + + /** + * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever + * the given {@code Trigger} fires after the watermark has passed the end of the window. + */ + public AfterWatermarkEarlyAndLate withLateFirings(OnceTriggerStateMachine lateFirings) { + checkNotNull(lateFirings, "Must specify the trigger to use for late firings"); + return new AfterWatermarkEarlyAndLate(NeverStateMachine.ever(), lateFirings); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + // We're interested in knowing when the input watermark passes the end of the window. + // (It is possible this has already happened, in which case the timer will be fired + // almost immediately). + c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + // NOTE that the ReduceFnRunner will delete all end-of-window timers for the + // merged-away windows. + + if (!c.trigger().finishedInAllMergingWindows()) { + // If the trigger is still active in any merging window then it is still active in the new + // merged window, because even if the merged window is "done" some pending elements haven't + // had a chance to fire + c.trigger().setFinished(false); + } else if (!endOfWindowReached(c)) { + // If the end of the new window has not been reached, then the trigger is active again. + c.trigger().setFinished(false); + } else { + // Otherwise it is done for good + c.trigger().setFinished(true); + } + } + + @Override + public String toString() { + return TO_STRING; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof FromEndOfWindow; + } + + @Override + public int hashCode() { + return Objects.hash(getClass()); + } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + return endOfWindowReached(context); + } + + private boolean endOfWindowReached(TriggerStateMachine.TriggerContext context) { + return context.currentEventTime() != null + && context.currentEventTime().isAfter(context.window().maxTimestamp()); + } + + @Override + protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java new file mode 100644 index 0000000..be4dd68 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.util.TimeDomain; + +/** + * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. See + * {@link RepeatedlyStateMachine#forever} and {@link AfterWatermarkStateMachine#pastEndOfWindow} for + * more details. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class DefaultTriggerStateMachine extends TriggerStateMachine { + + private DefaultTriggerStateMachine() { + super(null); + } + + /** + * Returns the default trigger. + */ + public static DefaultTriggerStateMachine of() { + return new DefaultTriggerStateMachine(); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + // If the end of the window has already been reached, then we are already ready to fire + // and do not need to set a wake-up timer. + if (!endOfWindowReached(c)) { + c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); + } + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + // If the end of the window has already been reached, then we are already ready to fire + // and do not need to set a wake-up timer. + if (!endOfWindowReached(c)) { + c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); + } + } + + @Override + public void clear(TriggerContext c) throws Exception { } + + @Override + public boolean isCompatible(TriggerStateMachine other) { + // Semantically, all default triggers are identical + return other instanceof DefaultTriggerStateMachine; + } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + return endOfWindowReached(context); + } + + private boolean endOfWindowReached(TriggerStateMachine.TriggerContext context) { + return context.currentEventTime() != null + && context.currentEventTime().isAfter(context.window().maxTimestamp()); + } + + @Override + public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java new file mode 100644 index 0000000..c4d89c2 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; + +/** + * A wrapper around a trigger used during execution. While an actual trigger may appear multiple + * times (both in the same trigger expression and in other trigger expressions), the + * {@code ExecutableTrigger} wrapped around them forms a tree (only one occurrence). + */ +public class ExecutableTriggerStateMachine implements Serializable { + + /** Store the index assigned to this trigger. */ + private final int triggerIndex; + private final int firstIndexAfterSubtree; + private final List<ExecutableTriggerStateMachine> subTriggers = new ArrayList<>(); + private final TriggerStateMachine trigger; + + public static <W extends BoundedWindow> ExecutableTriggerStateMachine create( + TriggerStateMachine trigger) { + return create(trigger, 0); + } + + private static <W extends BoundedWindow> ExecutableTriggerStateMachine create( + TriggerStateMachine trigger, int nextUnusedIndex) { + if (trigger instanceof OnceTriggerStateMachine) { + return new ExecutableOnceTriggerStateMachine( + (OnceTriggerStateMachine) trigger, nextUnusedIndex); + } else { + return new ExecutableTriggerStateMachine(trigger, nextUnusedIndex); + } + } + + public static <W extends BoundedWindow> ExecutableTriggerStateMachine createForOnceTrigger( + OnceTriggerStateMachine trigger, int nextUnusedIndex) { + return new ExecutableOnceTriggerStateMachine(trigger, nextUnusedIndex); + } + + private ExecutableTriggerStateMachine(TriggerStateMachine trigger, int nextUnusedIndex) { + this.trigger = checkNotNull(trigger, "trigger must not be null"); + this.triggerIndex = nextUnusedIndex++; + + if (trigger.subTriggers() != null) { + for (TriggerStateMachine subTrigger : trigger.subTriggers()) { + ExecutableTriggerStateMachine subExecutable = create(subTrigger, nextUnusedIndex); + subTriggers.add(subExecutable); + nextUnusedIndex = subExecutable.firstIndexAfterSubtree; + } + } + firstIndexAfterSubtree = nextUnusedIndex; + } + + public List<ExecutableTriggerStateMachine> subTriggers() { + return subTriggers; + } + + @Override + public String toString() { + return trigger.toString(); + } + + /** + * Return the underlying trigger specification corresponding to this {@code ExecutableTrigger}. + */ + public TriggerStateMachine getSpec() { + return trigger; + } + + public int getTriggerIndex() { + return triggerIndex; + } + + public final int getFirstIndexAfterSubtree() { + return firstIndexAfterSubtree; + } + + public boolean isCompatible(ExecutableTriggerStateMachine other) { + return trigger.isCompatible(other.trigger); + } + + public ExecutableTriggerStateMachine getSubTriggerContaining(int index) { + checkNotNull(subTriggers); + checkState(index > triggerIndex && index < firstIndexAfterSubtree, + "Cannot find sub-trigger containing index not in this tree."); + ExecutableTriggerStateMachine previous = null; + for (ExecutableTriggerStateMachine subTrigger : subTriggers) { + if (index < subTrigger.triggerIndex) { + return previous; + } + previous = subTrigger; + } + return previous; + } + + /** + * Invoke the {@link TriggerStateMachine#onElement} method for this trigger, ensuring that the + * bits are properly updated if the trigger finishes. + */ + public void invokeOnElement(TriggerStateMachine.OnElementContext c) throws Exception { + trigger.onElement(c.forTrigger(this)); + } + + /** + * Invoke the {@link TriggerStateMachine#onMerge} method for this trigger, ensuring that the bits + * are properly updated. + */ + public void invokeOnMerge(TriggerStateMachine.OnMergeContext c) throws Exception { + TriggerStateMachine.OnMergeContext subContext = c.forTrigger(this); + trigger.onMerge(subContext); + } + + public boolean invokeShouldFire(TriggerStateMachine.TriggerContext c) throws Exception { + return trigger.shouldFire(c.forTrigger(this)); + } + + public void invokeOnFire(TriggerStateMachine.TriggerContext c) throws Exception { + trigger.onFire(c.forTrigger(this)); + } + + /** + * Invoke clear for the current this trigger. + */ + public void invokeClear(TriggerStateMachine.TriggerContext c) throws Exception { + trigger.clear(c.forTrigger(this)); + } + + /** + * {@link ExecutableTriggerStateMachine} that enforces the fact that the trigger should always + * FIRE_AND_FINISH and never just FIRE. + */ + private static class ExecutableOnceTriggerStateMachine extends ExecutableTriggerStateMachine { + + public ExecutableOnceTriggerStateMachine(OnceTriggerStateMachine trigger, int nextUnusedIndex) { + super(trigger, nextUnusedIndex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggers.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggers.java new file mode 100644 index 0000000..1098716 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggers.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +/** + * A mutable set which tracks whether any particular {@link ExecutableTriggerStateMachine} is + * finished. + */ +public interface FinishedTriggers { + /** + * Returns {@code true} if the trigger is finished. + */ + public boolean isFinished(ExecutableTriggerStateMachine trigger); + + /** + * Sets the fact that the trigger is finished. + */ + public void setFinished(ExecutableTriggerStateMachine trigger, boolean value); + + /** + * Sets the trigger and all of its subtriggers to unfinished. + */ + public void clearRecursively(ExecutableTriggerStateMachine trigger); + + /** + * Create an independent copy of this mutable {@link FinishedTriggers}. + */ + public FinishedTriggers copy(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java new file mode 100644 index 0000000..0f74969 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import java.util.BitSet; + +/** + * A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}. + */ +public class FinishedTriggersBitSet implements FinishedTriggers { + + private final BitSet bitSet; + + private FinishedTriggersBitSet(BitSet bitSet) { + this.bitSet = bitSet; + } + + public static FinishedTriggersBitSet emptyWithCapacity(int capacity) { + return new FinishedTriggersBitSet(new BitSet(capacity)); + } + + public static FinishedTriggersBitSet fromBitSet(BitSet bitSet) { + return new FinishedTriggersBitSet(bitSet); + } + + /** + * Returns the underlying {@link BitSet} for this {@link FinishedTriggersBitSet}. + */ + public BitSet getBitSet() { + return bitSet; + } + + @Override + public boolean isFinished(ExecutableTriggerStateMachine trigger) { + return bitSet.get(trigger.getTriggerIndex()); + } + + @Override + public void setFinished(ExecutableTriggerStateMachine trigger, boolean value) { + bitSet.set(trigger.getTriggerIndex(), value); + } + + @Override + public void clearRecursively(ExecutableTriggerStateMachine trigger) { + bitSet.clear(trigger.getTriggerIndex(), trigger.getFirstIndexAfterSubtree()); + } + + @Override + public FinishedTriggersBitSet copy() { + return new FinishedTriggersBitSet((BitSet) bitSet.clone()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersSet.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersSet.java new file mode 100644 index 0000000..95c35f2 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersSet.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import com.google.common.collect.Sets; +import java.util.Set; + +/** + * An implementation of {@link FinishedTriggers} atop a user-provided mutable {@link Set}. + */ +public class FinishedTriggersSet implements FinishedTriggers { + + private final Set<ExecutableTriggerStateMachine> finishedTriggers; + + private FinishedTriggersSet(Set<ExecutableTriggerStateMachine> finishedTriggers) { + this.finishedTriggers = finishedTriggers; + } + + public static FinishedTriggersSet fromSet(Set<ExecutableTriggerStateMachine> finishedTriggers) { + return new FinishedTriggersSet(finishedTriggers); + } + + /** + * Returns a mutable {@link Set} of the underlying triggers that are finished. + */ + public Set<ExecutableTriggerStateMachine> getFinishedTriggers() { + return finishedTriggers; + } + + @Override + public boolean isFinished(ExecutableTriggerStateMachine trigger) { + return finishedTriggers.contains(trigger); + } + + @Override + public void setFinished(ExecutableTriggerStateMachine trigger, boolean value) { + if (value) { + finishedTriggers.add(trigger); + } else { + finishedTriggers.remove(trigger); + } + } + + @Override + public void clearRecursively(ExecutableTriggerStateMachine trigger) { + finishedTriggers.remove(trigger); + for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) { + clearRecursively(subTrigger); + } + } + + @Override + public FinishedTriggersSet copy() { + return fromSet(Sets.newHashSet(finishedTriggers)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java new file mode 100644 index 0000000..f32c7a8 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; + +/** + * A {@link TriggerStateMachine} which never fires. + * + * <p>Using this trigger will only produce output when the watermark passes the end of the + * {@link BoundedWindow window} plus the allowed lateness. + */ +public final class NeverStateMachine extends OnceTriggerStateMachine { + /** + * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey} + * when the {@link BoundedWindow} closes. + */ + public static NeverStateMachine ever() { + // NeverTrigger ignores all inputs and is Window-type independent. + return new NeverStateMachine(); + } + + private NeverStateMachine() { + super(null); + } + + @Override + public void onElement(OnElementContext c) {} + + @Override + public void onMerge(OnMergeContext c) {} + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) { + return false; + } + + @Override + protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) { + throw new UnsupportedOperationException( + String.format("%s should never fire", getClass().getSimpleName())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java new file mode 100644 index 0000000..f9ec5e7 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Arrays; + +/** + * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires. + */ +class OrFinallyStateMachine extends TriggerStateMachine { + + private static final int ACTUAL = 0; + private static final int UNTIL = 1; + + @VisibleForTesting + OrFinallyStateMachine(TriggerStateMachine actual, OnceTriggerStateMachine until) { + super(Arrays.asList(actual, until)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + c.trigger().subTrigger(ACTUAL).invokeOnElement(c); + c.trigger().subTrigger(UNTIL).invokeOnElement(c); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) { + subTrigger.invokeOnMerge(c); + } + updateFinishedState(c); + } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context) + || context.trigger().subTrigger(UNTIL).invokeShouldFire(context); + } + + @Override + public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { + ExecutableTriggerStateMachine actualSubtrigger = context.trigger().subTrigger(ACTUAL); + ExecutableTriggerStateMachine untilSubtrigger = context.trigger().subTrigger(UNTIL); + + if (untilSubtrigger.invokeShouldFire(context)) { + untilSubtrigger.invokeOnFire(context); + actualSubtrigger.invokeClear(context); + } else { + // If until didn't fire, then the actual must have (or it is forbidden to call + // onFire) so we are done only if actual is done. + actualSubtrigger.invokeOnFire(context); + // Do not clear the until trigger, because it tracks data cross firings. + } + updateFinishedState(context); + } + + @Override + public String toString() { + return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL)); + } + + private void updateFinishedState(TriggerContext c) throws Exception { + boolean anyStillFinished = false; + for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) { + anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished(); + } + c.trigger().setFinished(anyStillFinished); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachine.java new file mode 100644 index 0000000..35b701c --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachine.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import java.util.Arrays; + +/** + * Repeat a trigger, either until some condition is met or forever. + * + * <p>For example, to fire after the end of the window, and every time late data arrives: + * <pre> {@code + * Repeatedly.forever(AfterWatermark.isPastEndOfWindow()); + * } </pre> + * + * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite + * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}. + */ +public class RepeatedlyStateMachine extends TriggerStateMachine { + + private static final int REPEATED = 0; + + /** + * Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each + * time it fires and ignoring any indications to finish. + * + * <p>Unless used with {@link TriggerStateMachine#orFinally} the composite trigger will never + * finish. + * + * @param repeated the trigger to execute repeatedly. + */ + public static RepeatedlyStateMachine forever(TriggerStateMachine repeated) { + return new RepeatedlyStateMachine(repeated); + } + + private RepeatedlyStateMachine(TriggerStateMachine repeated) { + super(Arrays.asList(repeated)); + } + + + @Override + public void onElement(OnElementContext c) throws Exception { + getRepeated(c).invokeOnElement(c); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + getRepeated(c).invokeOnMerge(c); + } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + return getRepeated(context).invokeShouldFire(context); + } + + @Override + public void onFire(TriggerContext context) throws Exception { + getRepeated(context).invokeOnFire(context); + + if (context.trigger().isFinished(REPEATED)) { + // Reset tree will recursively clear the finished bits, and invoke clear. + context.forTrigger(getRepeated(context)).trigger().resetTree(); + } + } + + @Override + public String toString() { + return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED)); + } + + private ExecutableTriggerStateMachine getRepeated(TriggerContext context) { + return context.trigger().subTrigger(REPEATED); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java new file mode 100644 index 0000000..fc9f203 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import org.apache.beam.sdk.util.Reshuffle; + +/** + * The trigger used with {@link Reshuffle} which triggers on every element + * and never buffers state. + */ +public class ReshuffleTriggerStateMachine extends TriggerStateMachine { + + public ReshuffleTriggerStateMachine() { + super(null); + } + + @Override + public void onElement(TriggerStateMachine.OnElementContext c) { } + + @Override + public void onMerge(TriggerStateMachine.OnMergeContext c) { } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + return true; + } + + @Override + public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { } + + @Override + public String toString() { + return "ReshuffleTriggerStateMachine()"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java new file mode 100644 index 0000000..f8def20 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import com.google.common.base.Joiner; +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.joda.time.Instant; + +/** + * {@code Trigger}s control when the elements for a specific key and window are output. As elements + * arrive, they are put into one or more windows by a {@link Window} transform and its associated + * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the {@code + * Window}s contents should be output. + * + * <p>See {@link org.apache.beam.sdk.transforms.GroupByKey} and {@link Window} for more information + * about how grouping with windows works. + * + * <p>The elements that are assigned to a window since the last time it was fired (or since the + * window was created) are placed into the current window pane. Triggers are evaluated against the + * elements as they are added. When the root trigger fires, the elements in the current pane will be + * output. When the root trigger finishes (indicating it will never fire again), the window is + * closed and any new elements assigned to that window are discarded. + * + * <p>Several predefined {@code Trigger}s are provided: + * + * <ul> + * <li> {@link AfterWatermarkStateMachine} for firing when the watermark passes a timestamp + * determined from either the end of the window or the arrival of the first element in a pane. + * <li> {@link AfterProcessingTimeStateMachine} for firing after some amount of processing time has + * elapsed (typically since the first element in a pane). + * <li> {@link AfterPaneStateMachine} for firing off a property of the elements in the current pane, + * such as the number of elements that have been assigned to the current pane. + * </ul> + * + * <p>In addition, {@code Trigger}s can be combined in a variety of ways: + * + * <ul> + * <li> {@link RepeatedlyStateMachine#forever} to create a trigger that executes forever. Any time + * its argument finishes it gets reset and starts over. Can be combined with {@link + * TriggerStateMachine#orFinally} to specify a condition that causes the repetition to stop. + * <li> {@link AfterEachStateMachine#inOrder} to execute each trigger in sequence, firing each (and + * every) time that a trigger fires, and advancing to the next trigger in the sequence when it + * finishes. + * <li> {@link AfterFirstStateMachine#of} to create a trigger that fires after at least one of its + * arguments fires. An {@link AfterFirstStateMachine} trigger finishes after it fires once. + * <li> {@link AfterAllStateMachine#of} to create a trigger that fires after all least one of its + * arguments have fired at least once. An {@link AfterAllStateMachine} trigger finishes after it + * fires once. + * </ul> + * + * <p>Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one + * of the following states: + * + * <ul> + * <li> Never Existed - before the trigger has started executing, there is no state associated with + * it anywhere in the system. A trigger moves to the executing state as soon as it processes in + * the current pane. + * <li> Executing - while the trigger is receiving items and may fire. While it is in this state, it + * may persist book-keeping information to persisted state, set timers, etc. + * <li> Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the + * system remembers only that it is finished. Entering this state causes us to discard any + * elements in the buffer for that window, as well. + * </ul> + * + * <p>Once finished, a trigger cannot return itself back to an earlier state, however a composite + * trigger could reset its sub-triggers. + * + * <p>Triggers should not build up any state internally since they may be recreated between + * invocations of the callbacks. All important values should be persisted using state before the + * callback returns. + */ +public abstract class TriggerStateMachine implements Serializable { + + /** + * Interface for accessing information about the trigger being executed and other triggers in the + * same tree. + */ + public interface TriggerInfo { + + /** + * Returns true if the windowing strategy of the current {@code PCollection} is a merging {@link + * WindowFn}. If true, the trigger execution needs to keep enough information to support the + * possibility of {@link TriggerStateMachine#onMerge} being called. If false, {@link + * TriggerStateMachine#onMerge} will never be called. + */ + boolean isMerging(); + + /** + * Access the executable versions of the sub-triggers of the current trigger. + */ + Iterable<ExecutableTriggerStateMachine> subTriggers(); + + /** + * Access the executable version of the specified sub-trigger. + */ + ExecutableTriggerStateMachine subTrigger(int subtriggerIndex); + + /** + * Returns true if the current trigger is marked finished. + */ + boolean isFinished(); + + /** + * Return true if the given subtrigger is marked finished. + */ + boolean isFinished(int subtriggerIndex); + + /** + * Returns true if all the sub-triggers of the current trigger are marked finished. + */ + boolean areAllSubtriggersFinished(); + + /** + * Returns an iterable over the unfinished sub-triggers of the current trigger. + */ + Iterable<ExecutableTriggerStateMachine> unfinishedSubTriggers(); + + /** + * Returns the first unfinished sub-trigger. + */ + ExecutableTriggerStateMachine firstUnfinishedSubTrigger(); + + /** + * Clears all keyed state for triggers in the current sub-tree and unsets all the associated + * finished bits. + */ + void resetTree() throws Exception; + + /** + * Sets the finished bit for the current trigger. + */ + void setFinished(boolean finished); + + /** + * Sets the finished bit for the given sub-trigger. + */ + void setFinished(boolean finished, int subTriggerIndex); + } + + /** + * Interact with properties of the trigger being executed, with extensions to deal with the + * merging windows. + */ + public interface MergingTriggerInfo extends TriggerInfo { + + /** Return true if the trigger is finished in any window being merged. */ + public abstract boolean finishedInAnyMergingWindow(); + + /** Return true if the trigger is finished in all windows being merged. */ + public abstract boolean finishedInAllMergingWindows(); + } + + /** + * Information accessible to all operational hooks in this {@code Trigger}. + * + * <p>Used directly in {@link TriggerStateMachine#shouldFire} and {@link + * TriggerStateMachine#clear}, and extended with additional information in other methods. + */ + public abstract class TriggerContext { + + /** Returns the interface for accessing trigger info. */ + public abstract TriggerInfo trigger(); + + /** Returns the interface for accessing persistent state. */ + public abstract StateAccessor<?> state(); + + /** The window that the current context is executing in. */ + public abstract BoundedWindow window(); + + /** Create a sub-context for the given sub-trigger. */ + public abstract TriggerContext forTrigger(ExecutableTriggerStateMachine trigger); + + /** + * Removes the timer set in this trigger context for the given {@link Instant} + * and {@link TimeDomain}. + */ + public abstract void deleteTimer(Instant timestamp, TimeDomain domain); + + /** The current processing time. */ + public abstract Instant currentProcessingTime(); + + /** The current synchronized upstream processing time or {@code null} if unknown. */ + @Nullable + public abstract Instant currentSynchronizedProcessingTime(); + + /** The current event time for the input or {@code null} if unknown. */ + @Nullable + public abstract Instant currentEventTime(); + } + + /** + * Extended {@link TriggerContext} containing information accessible to the {@link #onElement} + * operational hook. + */ + public abstract class OnElementContext extends TriggerContext { + /** The event timestamp of the element currently being processed. */ + public abstract Instant eventTimestamp(); + + /** + * Sets a timer to fire when the watermark or processing time is beyond the given timestamp. + * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards. + * + * <p>As with {@link #state}, timers are implicitly scoped to the current window. All + * timer firings for a window will be received, but the implementation should choose to ignore + * those that are not applicable. + * + * @param timestamp the time at which the trigger should be re-evaluated + * @param domain the domain that the {@code timestamp} applies to + */ + public abstract void setTimer(Instant timestamp, TimeDomain domain); + + /** Create an {@code OnElementContext} for executing the given trigger. */ + @Override + public abstract OnElementContext forTrigger(ExecutableTriggerStateMachine trigger); + } + + /** + * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge} + * operational hook. + */ + public abstract class OnMergeContext extends TriggerContext { + /** + * Sets a timer to fire when the watermark or processing time is beyond the given timestamp. + * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards. + * + * <p>As with {@link #state}, timers are implicitly scoped to the current window. All + * timer firings for a window will be received, but the implementation should choose to ignore + * those that are not applicable. + * + * @param timestamp the time at which the trigger should be re-evaluated + * @param domain the domain that the {@code timestamp} applies to + */ + public abstract void setTimer(Instant timestamp, TimeDomain domain); + + /** Create an {@code OnMergeContext} for executing the given trigger. */ + @Override + public abstract OnMergeContext forTrigger(ExecutableTriggerStateMachine trigger); + + @Override + public abstract MergingStateAccessor<?, ?> state(); + + @Override + public abstract MergingTriggerInfo trigger(); + } + + @Nullable + protected final List<TriggerStateMachine> subTriggers; + + protected TriggerStateMachine(@Nullable List<TriggerStateMachine> subTriggers) { + this.subTriggers = subTriggers; + } + + + /** + * Called every time an element is incorporated into a window. + */ + public abstract void onElement(OnElementContext c) throws Exception; + + /** + * Called immediately after windows have been merged. + * + * <p>Leaf triggers should update their state by inspecting their status and any state in the + * merging windows. Composite triggers should update their state by calling {@link + * ExecutableTriggerStateMachine#invokeOnMerge} on their sub-triggers, and applying appropriate + * logic. + * + * <p>A trigger such as {@link AfterWatermarkStateMachine#pastEndOfWindow} may no longer be + * finished; it is the responsibility of the trigger itself to record this fact. It is forbidden + * for a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending + * elements that led to it being ready to fire. + * + * <p>The implementation does not need to clear out any state associated with the old windows. + */ + public abstract void onMerge(OnMergeContext c) throws Exception; + + /** + * Returns {@code true} if the current state of the trigger indicates that its condition + * is satisfied and it is ready to fire. + */ + public abstract boolean shouldFire(TriggerContext context) throws Exception; + + /** + * Adjusts the state of the trigger to be ready for the next pane. For example, a + * {@link RepeatedlyStateMachine} trigger will reset its inner trigger, since it has fired. + * + * <p>If the trigger is finished, it is the responsibility of the trigger itself to + * record that fact via the {@code context}. + */ + public abstract void onFire(TriggerContext context) throws Exception; + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #onElement} call. + */ + public void prefetchOnElement(StateAccessor<?> state) { + if (subTriggers != null) { + for (TriggerStateMachine trigger : subTriggers) { + trigger.prefetchOnElement(state); + } + } + } + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #onMerge} call. + */ + public void prefetchOnMerge(MergingStateAccessor<?, ?> state) { + if (subTriggers != null) { + for (TriggerStateMachine trigger : subTriggers) { + trigger.prefetchOnMerge(state); + } + } + } + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #shouldFire} call. + */ + public void prefetchShouldFire(StateAccessor<?> state) { + if (subTriggers != null) { + for (TriggerStateMachine trigger : subTriggers) { + trigger.prefetchShouldFire(state); + } + } + } + + /** + * Called to allow the trigger to prefetch any state it will likely need to read from during + * an {@link #onFire} call. + */ + public void prefetchOnFire(StateAccessor<?> state) { + if (subTriggers != null) { + for (TriggerStateMachine trigger : subTriggers) { + trigger.prefetchOnFire(state); + } + } + } + + /** + * Clear any state associated with this trigger in the given window. + * + * <p>This is called after a trigger has indicated it will never fire again. The trigger system + * keeps enough information to know that the trigger is finished, so this trigger should clear all + * of its state. + */ + public void clear(TriggerContext c) throws Exception { + if (subTriggers != null) { + for (ExecutableTriggerStateMachine trigger : c.trigger().subTriggers()) { + trigger.invokeClear(c); + } + } + } + + public Iterable<TriggerStateMachine> subTriggers() { + return subTriggers; + } + + /** + * Returns whether this performs the same triggering as the given {@code Trigger}. + */ + public boolean isCompatible(TriggerStateMachine other) { + if (!getClass().equals(other.getClass())) { + return false; + } + + if (subTriggers == null) { + return other.subTriggers == null; + } else if (other.subTriggers == null) { + return false; + } else if (subTriggers.size() != other.subTriggers.size()) { + return false; + } + + for (int i = 0; i < subTriggers.size(); i++) { + if (!subTriggers.get(i).isCompatible(other.subTriggers.get(i))) { + return false; + } + } + + return true; + } + + @Override + public String toString() { + String simpleName = getClass().getSimpleName(); + if (getClass().getEnclosingClass() != null) { + simpleName = getClass().getEnclosingClass().getSimpleName() + "." + simpleName; + } + if (subTriggers == null || subTriggers.size() == 0) { + return simpleName; + } else { + return simpleName + "(" + Joiner.on(", ").join(subTriggers) + ")"; + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof TriggerStateMachine)) { + return false; + } + TriggerStateMachine that = (TriggerStateMachine) obj; + return Objects.equals(getClass(), that.getClass()) + && Objects.equals(subTriggers, that.subTriggers); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), subTriggers); + } + + /** + * Specify an ending condition for this trigger. If the {@code until} fires then the combination + * fires. + * + * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes as + * soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time + * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that {@code + * t1} may have fired since {@code t2} started, so not all of the elements that {@code t2} has + * seen are necessarily in the current pane. + * + * <p>For example the final firing of the following trigger may only have 1 element: + * + * <pre>{@code + * Repeatedly.forever(AfterPane.elementCountAtLeast(2)) + * .orFinally(AfterPane.elementCountAtLeast(5)) + * } + * </pre> + * + * <p>Note that if {@code t1} is {@link OnceTriggerStateMachine}, then {@code t1.orFinally(t2)} is + * the same as {@code AfterFirst.of(t1, t2)}. + */ + public TriggerStateMachine orFinally(OnceTriggerStateMachine until) { + return new OrFinallyStateMachine(this, until); + } + + /** + * {@link TriggerStateMachine}s that are guaranteed to fire at most once should extend from this, + * rather than the general {@link TriggerStateMachine} class to indicate that behavior. + */ + public abstract static class OnceTriggerStateMachine extends TriggerStateMachine { + protected OnceTriggerStateMachine(List<TriggerStateMachine> subTriggers) { + super(subTriggers); + } + + /** + * {@inheritDoc} + */ + @Override + public final void onFire(TriggerContext context) throws Exception { + onOnlyFiring(context); + context.trigger().setFinished(true); + } + + /** + * Called exactly once by {@link #onFire} when the trigger is fired. By default, + * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}. + */ + protected abstract void onOnlyFiring(TriggerContext context) throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java new file mode 100644 index 0000000..1c06e8d --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import java.util.Collection; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.MergingTriggerInfo; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.TriggerInfo; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.ActiveWindowSet; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timers; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTag; +import org.joda.time.Instant; + +/** + * Factory for creating instances of the various {@link TriggerStateMachine} contexts. + * + * <p>These contexts are highly interdependent and share many fields; it is inadvisable + * to create them via any means other than this factory class. + */ +public class TriggerStateMachineContextFactory<W extends BoundedWindow> { + + private final WindowFn<?, W> windowFn; + private StateInternals<?> stateInternals; + private final Coder<W> windowCoder; + + public TriggerStateMachineContextFactory(WindowFn<?, W> windowFn, + StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) { + // Future triggers may be able to exploit the active window to state address window mapping. + this.windowFn = windowFn; + this.stateInternals = stateInternals; + this.windowCoder = windowFn.windowCoder(); + } + + public TriggerStateMachine.TriggerContext base(W window, Timers timers, + ExecutableTriggerStateMachine rootTrigger, FinishedTriggers finishedSet) { + return new TriggerContextImpl(window, timers, rootTrigger, finishedSet); + } + + public TriggerStateMachine.OnElementContext createOnElementContext( + W window, Timers timers, Instant elementTimestamp, + ExecutableTriggerStateMachine rootTrigger, FinishedTriggers finishedSet) { + return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp); + } + + public TriggerStateMachine.OnMergeContext createOnMergeContext(W window, Timers timers, + ExecutableTriggerStateMachine rootTrigger, FinishedTriggers finishedSet, + Map<W, FinishedTriggers> finishedSets) { + return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets); + } + + public StateAccessor<?> createStateAccessor(W window, ExecutableTriggerStateMachine trigger) { + return new StateAccessorImpl(window, trigger); + } + + public MergingStateAccessor<?, W> createMergingStateAccessor( + W mergeResult, Collection<W> mergingWindows, ExecutableTriggerStateMachine trigger) { + return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult); + } + + private class TriggerInfoImpl implements TriggerStateMachine.TriggerInfo { + + protected final ExecutableTriggerStateMachine trigger; + protected final FinishedTriggers finishedSet; + private final TriggerStateMachine.TriggerContext context; + + public TriggerInfoImpl(ExecutableTriggerStateMachine trigger, FinishedTriggers finishedSet, + TriggerStateMachine.TriggerContext context) { + this.trigger = trigger; + this.finishedSet = finishedSet; + this.context = context; + } + + @Override + public boolean isMerging() { + return !windowFn.isNonMerging(); + } + + @Override + public Iterable<ExecutableTriggerStateMachine> subTriggers() { + return trigger.subTriggers(); + } + + @Override + public ExecutableTriggerStateMachine subTrigger(int subtriggerIndex) { + return trigger.subTriggers().get(subtriggerIndex); + } + + @Override + public boolean isFinished() { + return finishedSet.isFinished(trigger); + } + + @Override + public boolean isFinished(int subtriggerIndex) { + return finishedSet.isFinished(subTrigger(subtriggerIndex)); + } + + @Override + public boolean areAllSubtriggersFinished() { + return Iterables.isEmpty(unfinishedSubTriggers()); + } + + @Override + public Iterable<ExecutableTriggerStateMachine> unfinishedSubTriggers() { + return FluentIterable + .from(trigger.subTriggers()) + .filter(new Predicate<ExecutableTriggerStateMachine>() { + @Override + public boolean apply(ExecutableTriggerStateMachine trigger) { + return !finishedSet.isFinished(trigger); + } + }); + } + + @Override + public ExecutableTriggerStateMachine firstUnfinishedSubTrigger() { + for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) { + if (!finishedSet.isFinished(subTrigger)) { + return subTrigger; + } + } + return null; + } + + @Override + public void resetTree() throws Exception { + finishedSet.clearRecursively(trigger); + trigger.invokeClear(context); + } + + @Override + public void setFinished(boolean finished) { + finishedSet.setFinished(trigger, finished); + } + + @Override + public void setFinished(boolean finished, int subTriggerIndex) { + finishedSet.setFinished(subTrigger(subTriggerIndex), finished); + } + } + + private class TriggerTimers implements Timers { + + private final Timers timers; + private final W window; + + public TriggerTimers(W window, Timers timers) { + this.timers = timers; + this.window = window; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain timeDomain) { + timers.setTimer(timestamp, timeDomain); + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { + if (timeDomain == TimeDomain.EVENT_TIME + && timestamp.equals(window.maxTimestamp())) { + // Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time + // state transitions. + return; + } + timers.deleteTimer(timestamp, timeDomain); + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } + + private class MergingTriggerInfoImpl + extends TriggerInfoImpl implements TriggerStateMachine.MergingTriggerInfo { + + private final Map<W, FinishedTriggers> finishedSets; + + public MergingTriggerInfoImpl( + ExecutableTriggerStateMachine trigger, + FinishedTriggers finishedSet, + TriggerStateMachine.TriggerContext context, + Map<W, FinishedTriggers> finishedSets) { + super(trigger, finishedSet, context); + this.finishedSets = finishedSets; + } + + @Override + public boolean finishedInAnyMergingWindow() { + for (FinishedTriggers finishedSet : finishedSets.values()) { + if (finishedSet.isFinished(trigger)) { + return true; + } + } + return false; + } + + @Override + public boolean finishedInAllMergingWindows() { + for (FinishedTriggers finishedSet : finishedSets.values()) { + if (!finishedSet.isFinished(trigger)) { + return false; + } + } + return true; + } + } + + private class StateAccessorImpl implements StateAccessor<Object> { + protected final int triggerIndex; + protected final StateNamespace windowNamespace; + + public StateAccessorImpl( + W window, + ExecutableTriggerStateMachine trigger) { + this.triggerIndex = trigger.getTriggerIndex(); + this.windowNamespace = namespaceFor(window); + } + + protected StateNamespace namespaceFor(W window) { + return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex); + } + + @Override + public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) { + return stateInternals.state(windowNamespace, address); + } + } + + private class MergingStateAccessorImpl extends StateAccessorImpl + implements MergingStateAccessor<Object, W> { + private final Collection<W> activeToBeMerged; + + public MergingStateAccessorImpl( + ExecutableTriggerStateMachine trigger, Collection<W> activeToBeMerged, W mergeResult) { + super(mergeResult, trigger); + this.activeToBeMerged = activeToBeMerged; + } + + @Override + public <StateT extends State> StateT access( + StateTag<? super Object, StateT> address) { + return stateInternals.state(windowNamespace, address); + } + + @Override + public <StateT extends State> Map<W, StateT> accessInEachMergingWindow( + StateTag<? super Object, StateT> address) { + ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder(); + for (W mergingWindow : activeToBeMerged) { + StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address); + builder.put(mergingWindow, stateForWindow); + } + return builder.build(); + } + } + + private class TriggerContextImpl extends TriggerStateMachine.TriggerContext { + + private final W window; + private final StateAccessorImpl state; + private final Timers timers; + private final TriggerInfoImpl triggerInfo; + + private TriggerContextImpl( + W window, + Timers timers, + ExecutableTriggerStateMachine trigger, + FinishedTriggers finishedSet) { + trigger.getSpec().super(); + this.window = window; + this.state = new StateAccessorImpl(window, trigger); + this.timers = new TriggerTimers(window, timers); + this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this); + } + + @Override + public TriggerStateMachine.TriggerContext forTrigger(ExecutableTriggerStateMachine trigger) { + return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet); + } + + @Override + public TriggerInfo trigger() { + return triggerInfo; + } + + @Override + public StateAccessor<?> state() { + return state; + } + + @Override + public W window() { + return window; + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain domain) { + timers.deleteTimer(timestamp, domain); + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + @Nullable + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } + + private class OnElementContextImpl extends TriggerStateMachine.OnElementContext { + + private final W window; + private final StateAccessorImpl state; + private final Timers timers; + private final TriggerInfoImpl triggerInfo; + private final Instant eventTimestamp; + + private OnElementContextImpl( + W window, + Timers timers, + ExecutableTriggerStateMachine trigger, + FinishedTriggers finishedSet, + Instant eventTimestamp) { + trigger.getSpec().super(); + this.window = window; + this.state = new StateAccessorImpl(window, trigger); + this.timers = new TriggerTimers(window, timers); + this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this); + this.eventTimestamp = eventTimestamp; + } + + + @Override + public Instant eventTimestamp() { + return eventTimestamp; + } + + @Override + public TriggerStateMachine.OnElementContext forTrigger(ExecutableTriggerStateMachine trigger) { + return new OnElementContextImpl( + window, timers, trigger, triggerInfo.finishedSet, eventTimestamp); + } + + @Override + public TriggerInfo trigger() { + return triggerInfo; + } + + @Override + public StateAccessor<?> state() { + return state; + } + + @Override + public W window() { + return window; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain domain) { + timers.setTimer(timestamp, domain); + } + + + @Override + public void deleteTimer(Instant timestamp, TimeDomain domain) { + timers.deleteTimer(timestamp, domain); + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + @Nullable + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } + + private class OnMergeContextImpl extends TriggerStateMachine.OnMergeContext { + private final MergingStateAccessor<?, W> state; + private final W window; + private final Collection<W> mergingWindows; + private final Timers timers; + private final MergingTriggerInfoImpl triggerInfo; + + private OnMergeContextImpl( + W window, + Timers timers, + ExecutableTriggerStateMachine trigger, + FinishedTriggers finishedSet, + Map<W, FinishedTriggers> finishedSets) { + trigger.getSpec().super(); + this.mergingWindows = finishedSets.keySet(); + this.window = window; + this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window); + this.timers = new TriggerTimers(window, timers); + this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets); + } + + @Override + public TriggerStateMachine.OnMergeContext forTrigger(ExecutableTriggerStateMachine trigger) { + return new OnMergeContextImpl( + window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets); + } + + @Override + public MergingStateAccessor<?, W> state() { + return state; + } + + @Override + public MergingTriggerInfo trigger() { + return triggerInfo; + } + + @Override + public W window() { + return window; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain domain) { + timers.setTimer(timestamp, domain); + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain domain) { + timers.setTimer(timestamp, domain); + + } + + @Override + public Instant currentProcessingTime() { + return timers.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timers.currentSynchronizedProcessingTime(); + } + + @Override + @Nullable + public Instant currentEventTime() { + return timers.currentEventTime(); + } + } +}