Add TriggerStateMachines with conversion from Trigger
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/00672961 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/00672961 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/00672961 Branch: refs/heads/master Commit: 00672961b5a3115c298c457dfe43f543947298a0 Parents: 2107f79 Author: Kenneth Knowles <k...@google.com> Authored: Thu Oct 13 20:02:52 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Oct 17 19:56:49 2016 -0700 ---------------------------------------------------------------------- .../core/triggers/TriggerStateMachines.java | 210 +++++++++++++++++++ .../core/triggers/TriggerStateMachinesTest.java | 199 ++++++++++++++++++ 2 files changed, 409 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00672961/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java new file mode 100644 index 0000000..317e3b9 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.AfterAll; +import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement; +import org.apache.beam.sdk.transforms.windowing.AfterEach; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; +import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Instant; + +/** Translates a {@link Trigger} to a {@link TriggerStateMachine}. */ +public class TriggerStateMachines { + + private TriggerStateMachines() {} + + @VisibleForTesting static final StateMachineConverter CONVERTER = new StateMachineConverter(); + + public static TriggerStateMachine stateMachineForTrigger(Trigger trigger) { + return CONVERTER.evaluateTrigger(trigger); + } + + public static OnceTriggerStateMachine stateMachineForOnceTrigger(OnceTrigger trigger) { + return CONVERTER.evaluateOnceTrigger(trigger); + } + + @VisibleForTesting + static class StateMachineConverter { + + public TriggerStateMachine evaluateTrigger(Trigger trigger) { + Method evaluationMethod = getEvaluationMethod(trigger.getClass()); + return tryEvaluate(evaluationMethod, trigger); + } + + public OnceTriggerStateMachine evaluateOnceTrigger(OnceTrigger trigger) { + Method evaluationMethod = getEvaluationMethod(trigger.getClass()); + return (OnceTriggerStateMachine) tryEvaluate(evaluationMethod, trigger); + } + + private TriggerStateMachine tryEvaluate(Method evaluationMethod, Trigger trigger) { + try { + return (TriggerStateMachine) evaluationMethod.invoke(this, trigger); + } catch (InvocationTargetException exc) { + if (exc.getCause() instanceof RuntimeException) { + throw (RuntimeException) exc.getCause(); + } else { + throw new RuntimeException(exc.getCause()); + } + } catch (IllegalAccessException exc) { + throw new IllegalStateException( + String.format("Internal error: could not invoke %s", evaluationMethod)); + } + } + + private Method getEvaluationMethod(Class<?> clazz) { + Method evaluationMethod; + try { + return getClass().getDeclaredMethod("evaluateSpecific", clazz); + } catch (NoSuchMethodException exc) { + throw new UnsupportedOperationException( + String.format( + "Cannot translate trigger class %s to a state machine.", clazz.getCanonicalName()), + exc); + } + } + + private TriggerStateMachine evaluateSpecific(DefaultTrigger v) { + return DefaultTriggerStateMachine.of(); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterWatermark.FromEndOfWindow v) { + return AfterWatermarkStateMachine.pastEndOfWindow(); + } + + private OnceTriggerStateMachine evaluateSpecific(NeverTrigger v) { + return NeverStateMachine.ever(); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterSynchronizedProcessingTime v) { + return new AfterSynchronizedProcessingTimeStateMachine(); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterFirst v) { + List<OnceTriggerStateMachine> subStateMachines = + Lists.newArrayListWithCapacity(v.subTriggers().size()); + for (Trigger subtrigger : v.subTriggers()) { + subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger)); + } + return AfterFirstStateMachine.of(subStateMachines); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterAll v) { + List<OnceTriggerStateMachine> subStateMachines = + Lists.newArrayListWithCapacity(v.subTriggers().size()); + for (Trigger subtrigger : v.subTriggers()) { + subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger)); + } + return AfterAllStateMachine.of(subStateMachines); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterPane v) { + return AfterPaneStateMachine.elementCountAtLeast(v.getElementCount()); + } + + private TriggerStateMachine evaluateSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) { + AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine = + AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(stateMachineForOnceTrigger(v.getEarlyTrigger())); + + if (v.getLateTrigger() != null) { + machine = machine.withLateFirings(stateMachineForOnceTrigger(v.getLateTrigger())); + } + return machine; + } + + private TriggerStateMachine evaluateSpecific(AfterEach v) { + List<TriggerStateMachine> subStateMachines = + Lists.newArrayListWithCapacity(v.subTriggers().size()); + + for (Trigger subtrigger : v.subTriggers()) { + subStateMachines.add(stateMachineForTrigger(subtrigger)); + } + + return AfterEachStateMachine.inOrder(subStateMachines); + } + + private TriggerStateMachine evaluateSpecific(Repeatedly v) { + return RepeatedlyStateMachine.forever(stateMachineForTrigger(v.getRepeatedTrigger())); + } + + private TriggerStateMachine evaluateSpecific(OrFinallyTrigger v) { + return new OrFinallyStateMachine( + stateMachineForTrigger(v.getMainTrigger()), + stateMachineForOnceTrigger(v.getUntilTrigger())); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterProcessingTime v) { + return evaluateSpecific((AfterDelayFromFirstElement) v); + } + + private OnceTriggerStateMachine evaluateSpecific(final AfterDelayFromFirstElement v) { + return new AfterDelayFromFirstElementStateMachineAdapter(v); + } + + private static class AfterDelayFromFirstElementStateMachineAdapter + extends AfterDelayFromFirstElementStateMachine { + + public AfterDelayFromFirstElementStateMachineAdapter(AfterDelayFromFirstElement v) { + this(v.getTimeDomain(), v.getTimestampMappers()); + } + + private AfterDelayFromFirstElementStateMachineAdapter( + TimeDomain timeDomain, List<SerializableFunction<Instant, Instant>> timestampMappers) { + super(timeDomain, timestampMappers); + } + + @Override + public Instant getCurrentTime(TriggerContext context) { + switch (timeDomain) { + case PROCESSING_TIME: + return context.currentProcessingTime(); + case SYNCHRONIZED_PROCESSING_TIME: + return context.currentSynchronizedProcessingTime(); + case EVENT_TIME: + return context.currentEventTime(); + default: + throw new IllegalArgumentException("A time domain that doesn't exist was received!"); + } + } + + @Override + protected AfterDelayFromFirstElementStateMachine newWith( + List<SerializableFunction<Instant, Instant>> transform) { + return new AfterDelayFromFirstElementStateMachineAdapter(timeDomain, transform); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00672961/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java new file mode 100644 index 0000000..37f8f10 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.sdk.transforms.windowing.AfterAll; +import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement; +import org.apache.beam.sdk.transforms.windowing.AfterEach; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.Never; +import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; +import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests the {@link TriggerStateMachines} static utility methods. */ +@RunWith(JUnit4.class) +public class TriggerStateMachinesTest { + + // + // Tests for leaf trigger translation + // + + @Test + public void testStateMachineForAfterPane() { + int count = 37; + AfterPane trigger = AfterPane.elementCountAtLeast(count); + AfterPaneStateMachine machine = + (AfterPaneStateMachine) TriggerStateMachines.stateMachineForOnceTrigger(trigger); + + assertThat(machine.getElementCount(), equalTo(trigger.getElementCount())); + } + + @Test + public void testStateMachineForAfterProcessingTime() { + Duration minutes = Duration.standardMinutes(94); + Duration hours = Duration.standardHours(13); + + AfterDelayFromFirstElement trigger = + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(minutes).alignedTo(hours); + + AfterDelayFromFirstElementStateMachine machine = + (AfterDelayFromFirstElementStateMachine) + TriggerStateMachines.stateMachineForOnceTrigger(trigger); + + assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME)); + + // This equality is function equality, but due to the structure of the code (no serialization) + // it is OK to check + assertThat(machine.getTimestampMappers(), equalTo(trigger.getTimestampMappers())); + } + + @Test + public void testStateMachineForAfterWatermark() { + AfterWatermark.FromEndOfWindow trigger = AfterWatermark.pastEndOfWindow(); + AfterWatermarkStateMachine.FromEndOfWindow machine = + (AfterWatermarkStateMachine.FromEndOfWindow) + TriggerStateMachines.stateMachineForOnceTrigger(trigger); + // No parameters, so if it doesn't crash, we win! + } + + @Test + public void testDefaultTriggerTranslation() { + DefaultTrigger trigger = DefaultTrigger.of(); + DefaultTriggerStateMachine machine = + (DefaultTriggerStateMachine) + checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger)); + // No parameters, so if it doesn't crash, we win! + } + + @Test + public void testNeverTranslation() { + NeverTrigger trigger = Never.ever(); + NeverStateMachine machine = + (NeverStateMachine) checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger)); + // No parameters, so if it doesn't crash, we win! + } + + // + // Tests for composite trigger translation + // + // These check just that translation was invoked recursively using somewhat random + // leaf subtriggers; by induction it all holds together. Beyond this, explicit tests + // of particular triggers will suffice. + + private static final int ELEM_COUNT = 472; + private static final Duration DELAY = Duration.standardSeconds(95673); + + private final OnceTrigger subtrigger1 = AfterPane.elementCountAtLeast(ELEM_COUNT); + private final OnceTrigger subtrigger2 = + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DELAY); + + private final OnceTriggerStateMachine submachine1 = + TriggerStateMachines.stateMachineForOnceTrigger(subtrigger1); + private final OnceTriggerStateMachine submachine2 = + TriggerStateMachines.stateMachineForOnceTrigger(subtrigger2); + + @Test + public void testAfterEachTranslation() { + AfterEach trigger = AfterEach.inOrder(subtrigger1, subtrigger2); + AfterEachStateMachine machine = + (AfterEachStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat(machine, equalTo(AfterEachStateMachine.inOrder(submachine1, submachine2))); + } + + @Test + public void testAfterFirstTranslation() { + AfterFirst trigger = AfterFirst.of(subtrigger1, subtrigger2); + AfterFirstStateMachine machine = + (AfterFirstStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat(machine, equalTo(AfterFirstStateMachine.of(submachine1, submachine2))); + } + + @Test + public void testAfterAllTranslation() { + AfterAll trigger = AfterAll.of(subtrigger1, subtrigger2); + AfterAllStateMachine machine = + (AfterAllStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat(machine, equalTo(AfterAllStateMachine.of(submachine1, submachine2))); + } + + @Test + public void testAfterWatermarkEarlyTranslation() { + AfterWatermark.AfterWatermarkEarlyAndLate trigger = + AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1); + AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine = + (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate) + TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat( + machine, + equalTo(AfterWatermarkStateMachine.pastEndOfWindow().withEarlyFirings(submachine1))); + } + + @Test + public void testAfterWatermarkEarlyLateTranslation() { + AfterWatermark.AfterWatermarkEarlyAndLate trigger = + AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1).withLateFirings(subtrigger2); + AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine = + (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate) + TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat( + machine, + equalTo( + AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(submachine1) + .withLateFirings(submachine2))); + } + + @Test + public void testOrFinallyTranslation() { + OrFinallyTrigger trigger = subtrigger1.orFinally(subtrigger2); + OrFinallyStateMachine machine = + (OrFinallyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat(machine, equalTo(submachine1.orFinally(submachine2))); + } + + @Test + public void testRepeatedlyTranslation() { + Repeatedly trigger = Repeatedly.forever(subtrigger1); + RepeatedlyStateMachine machine = + (RepeatedlyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat(machine, equalTo(RepeatedlyStateMachine.forever(submachine1))); + } +}