Build trigger state machine from Runner API Trigger proto directly

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27a482ba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27a482ba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27a482ba

Branch: refs/heads/master
Commit: 27a482baf498bab8c931670adec5134c0bdf08ac
Parents: bd8b72c
Author: Kenneth Knowles <k...@google.com>
Authored: Fri Feb 17 16:05:13 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Fri Feb 24 07:16:36 2017 -0800

----------------------------------------------------------------------
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |   4 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   4 +-
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |   4 +-
 .../core/triggers/AfterAllStateMachine.java     |   2 +-
 .../core/triggers/AfterFirstStateMachine.java   |   2 +-
 .../triggers/AfterWatermarkStateMachine.java    |  14 +-
 .../core/triggers/OrFinallyStateMachine.java    |   2 +-
 .../core/triggers/TriggerStateMachine.java      |   2 +-
 .../core/triggers/TriggerStateMachines.java     | 272 ++++++-------------
 .../beam/runners/core/ReduceFnTester.java       |   8 +-
 .../core/triggers/TriggerStateMachinesTest.java | 160 +++++++----
 .../GroupAlsoByWindowEvaluatorFactory.java      |   4 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |  40 +--
 13 files changed, 243 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index 5b2e051..e3ce1ef 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.joda.time.Instant;
@@ -63,7 +64,8 @@ public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, 
OutputT, W extends
             key,
             strategy,
             ExecutableTriggerStateMachine.create(
-                
TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger())),
+                TriggerStateMachines.stateMachineForTrigger(
+                    Triggers.toProto(strategy.getTrigger()))),
             stateInternals,
             timerInternals,
             
WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index ac0b1ab..8dc1502 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -22,6 +22,7 @@ import 
org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -77,7 +78,8 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
             key,
             windowingStrategy,
             ExecutableTriggerStateMachine.create(
-                
TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
+                TriggerStateMachines.stateMachineForTrigger(
+                    Triggers.toProto(windowingStrategy.getTrigger()))),
             stateInternals,
             timerInternals,
             
WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 7b65a0b..444f8fe 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -128,7 +129,8 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
             key,
             windowingStrategy,
             ExecutableTriggerStateMachine.create(
-                
TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
+                TriggerStateMachines.stateMachineForTrigger(
+                    Triggers.toProto(windowingStrategy.getTrigger()))),
             stateInternals,
             timerInternals,
             outputWindowedValue(),

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
index 12cbc3d..0f0c17c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
@@ -42,7 +42,7 @@ public class AfterAllStateMachine extends 
OnceTriggerStateMachine {
    * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
    */
   @SafeVarargs
-  public static OnceTriggerStateMachine of(OnceTriggerStateMachine... 
triggers) {
+  public static OnceTriggerStateMachine of(TriggerStateMachine... triggers) {
     return new 
AfterAllStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
index f4b305e..840a65c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
@@ -43,7 +43,7 @@ public class AfterFirstStateMachine extends 
OnceTriggerStateMachine {
    */
   @SafeVarargs
   public static OnceTriggerStateMachine of(
-      OnceTriggerStateMachine... triggers) {
+      TriggerStateMachine... triggers) {
     return new 
AfterFirstStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
index e83c2f8..0b12005 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
@@ -78,13 +78,13 @@ public class AfterWatermarkStateMachine {
     private static final int EARLY_INDEX = 0;
     private static final int LATE_INDEX = 1;
 
-    private final OnceTriggerStateMachine earlyTrigger;
+    private final TriggerStateMachine earlyTrigger;
     @Nullable
-    private final OnceTriggerStateMachine lateTrigger;
+    private final TriggerStateMachine lateTrigger;
 
     @SuppressWarnings("unchecked")
     private AfterWatermarkEarlyAndLate(
-        OnceTriggerStateMachine earlyTrigger, OnceTriggerStateMachine 
lateTrigger) {
+        TriggerStateMachine earlyTrigger, TriggerStateMachine lateTrigger) {
       super(
           lateTrigger == null
               ? ImmutableList.<TriggerStateMachine>of(earlyTrigger)
@@ -93,11 +93,11 @@ public class AfterWatermarkStateMachine {
       this.lateTrigger = lateTrigger;
     }
 
-    public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTriggerStateMachine 
earlyTrigger) {
+    public AfterWatermarkEarlyAndLate withEarlyFirings(TriggerStateMachine 
earlyTrigger) {
       return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
     }
 
-    public AfterWatermarkEarlyAndLate withLateFirings(OnceTriggerStateMachine 
lateTrigger) {
+    public AfterWatermarkEarlyAndLate withLateFirings(TriggerStateMachine 
lateTrigger) {
       return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
     }
 
@@ -254,7 +254,7 @@ public class AfterWatermarkStateMachine {
      * Creates a new {@code Trigger} like the this, except that it fires 
repeatedly whenever
      * the given {@code Trigger} fires before the watermark has passed the end 
of the window.
      */
-    public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTriggerStateMachine 
earlyFirings) {
+    public AfterWatermarkEarlyAndLate withEarlyFirings(TriggerStateMachine 
earlyFirings) {
       checkNotNull(earlyFirings, "Must specify the trigger to use for early 
firings");
       return new AfterWatermarkEarlyAndLate(earlyFirings, null);
     }
@@ -263,7 +263,7 @@ public class AfterWatermarkStateMachine {
      * Creates a new {@code Trigger} like the this, except that it fires 
repeatedly whenever
      * the given {@code Trigger} fires after the watermark has passed the end 
of the window.
      */
-    public AfterWatermarkEarlyAndLate withLateFirings(OnceTriggerStateMachine 
lateFirings) {
+    public AfterWatermarkEarlyAndLate withLateFirings(TriggerStateMachine 
lateFirings) {
       checkNotNull(lateFirings, "Must specify the trigger to use for late 
firings");
       return new AfterWatermarkEarlyAndLate(NeverStateMachine.ever(), 
lateFirings);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java
index f9ec5e7..e3bfb4f 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java
@@ -29,7 +29,7 @@ class OrFinallyStateMachine extends TriggerStateMachine {
   private static final int UNTIL = 1;
 
   @VisibleForTesting
-  OrFinallyStateMachine(TriggerStateMachine actual, OnceTriggerStateMachine 
until) {
+  OrFinallyStateMachine(TriggerStateMachine actual, TriggerStateMachine until) 
{
     super(Arrays.asList(actual, until));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
index 8b8d737..d622ac9 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
@@ -456,7 +456,7 @@ public abstract class TriggerStateMachine implements 
Serializable {
    * <p>Note that if {@code t1} is {@link OnceTriggerStateMachine}, then 
{@code t1.orFinally(t2)} is
    * the same as {@code AfterFirst.of(t1, t2)}.
    */
-  public TriggerStateMachine orFinally(OnceTriggerStateMachine until) {
+  public TriggerStateMachine orFinally(TriggerStateMachine until) {
     return new OrFinallyStateMachine(this, until);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
index b13ac40..1088435 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
@@ -17,32 +17,11 @@
  */
 package org.apache.beam.runners.core.triggers;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Lists;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.List;
-import javax.annotation.Nonnull;
-import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.AfterAll;
-import org.apache.beam.sdk.transforms.windowing.AfterEach;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import 
org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
-import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.TimestampTransform;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ReshuffleTrigger;
-import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /** Translates a {@link Trigger} to a {@link TriggerStateMachine}. */
@@ -50,187 +29,96 @@ public class TriggerStateMachines {
 
   private TriggerStateMachines() {}
 
-  @VisibleForTesting static final StateMachineConverter CONVERTER = new 
StateMachineConverter();
-
-  public static TriggerStateMachine stateMachineForTrigger(Trigger trigger) {
-    return CONVERTER.evaluateTrigger(trigger);
-  }
-
-  public static OnceTriggerStateMachine stateMachineForOnceTrigger(OnceTrigger 
trigger) {
-    return CONVERTER.evaluateOnceTrigger(trigger);
-  }
-
-  @VisibleForTesting
-  static class StateMachineConverter {
-
-    public TriggerStateMachine evaluateTrigger(Trigger trigger) {
-      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
-      return tryEvaluate(evaluationMethod, trigger);
-    }
-
-    public OnceTriggerStateMachine evaluateOnceTrigger(OnceTrigger trigger) {
-      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
-      return (OnceTriggerStateMachine) tryEvaluate(evaluationMethod, trigger);
-    }
-
-    private TriggerStateMachine tryEvaluate(Method evaluationMethod, Trigger 
trigger) {
-      try {
-        return (TriggerStateMachine) evaluationMethod.invoke(this, trigger);
-      } catch (InvocationTargetException exc) {
-        if (exc.getCause() instanceof RuntimeException) {
-          throw (RuntimeException) exc.getCause();
-        } else {
-          throw new RuntimeException(exc.getCause());
-        }
-      } catch (IllegalAccessException exc) {
-        throw new IllegalStateException(
-            String.format("Internal error: could not invoke %s", 
evaluationMethod));
-      }
-    }
-
-    private Method getEvaluationMethod(Class<?> clazz) {
-      try {
-        return getClass().getDeclaredMethod("evaluateSpecific", clazz);
-      } catch (NoSuchMethodException exc) {
-        throw new UnsupportedOperationException(
-            String.format(
-                "Cannot translate trigger class %s to a state machine.", 
clazz.getCanonicalName()),
-            exc);
-      }
-    }
-
-    private TriggerStateMachine evaluateSpecific(DefaultTrigger v) {
-      return DefaultTriggerStateMachine.of();
-    }
-
-    private TriggerStateMachine evaluateSpecific(ReshuffleTrigger v) {
-      return new ReshuffleTriggerStateMachine();
+  public static TriggerStateMachine stateMachineForTrigger(RunnerApi.Trigger 
trigger) {
+    switch (trigger.getTriggerCase()) {
+      case AFTER_ALL:
+        return AfterAllStateMachine.of(
+            
stateMachinesForTriggers(trigger.getAfterAll().getSubtriggersList()));
+      case AFTER_ANY:
+        return AfterFirstStateMachine.of(
+            
stateMachinesForTriggers(trigger.getAfterAny().getSubtriggersList()));
+      case AFTER_END_OF_WIDOW:
+        return stateMachineForAfterEndOfWindow(trigger.getAfterEndOfWidow());
+      case ELEMENT_COUNT:
+        return AfterPaneStateMachine.elementCountAtLeast(
+            trigger.getElementCount().getElementCount());
+      case AFTER_SYNCHRONIZED_PROCESSING_TIME:
+        return AfterSynchronizedProcessingTimeStateMachine.ofFirstElement();
+      case DEFAULT:
+        return DefaultTriggerStateMachine.of();
+      case NEVER:
+        return NeverStateMachine.ever();
+      case ALWAYS:
+        return ReshuffleTriggerStateMachine.create();
+      case OR_FINALLY:
+        return stateMachineForTrigger(trigger.getOrFinally().getMain())
+            
.orFinally(stateMachineForTrigger(trigger.getOrFinally().getFinally()));
+      case REPEAT:
+        return RepeatedlyStateMachine.forever(
+            stateMachineForTrigger(trigger.getRepeat().getSubtrigger()));
+      case AFTER_EACH:
+        return AfterEachStateMachine.inOrder(
+            
stateMachinesForTriggers(trigger.getAfterEach().getSubtriggersList()));
+      case AFTER_PROCESSING_TIME:
+        return 
stateMachineForAfterProcessingTime(trigger.getAfterProcessingTime());
+      case TRIGGER_NOT_SET:
+        throw new IllegalArgumentException(
+            String.format("Required field 'trigger' not set on %s", trigger));
+      default:
+        throw new IllegalArgumentException(String.format("Unknown trigger type 
%s", trigger));
     }
+  }
 
-    private OnceTriggerStateMachine 
evaluateSpecific(AfterWatermark.FromEndOfWindow v) {
+  private static TriggerStateMachine stateMachineForAfterEndOfWindow(
+      RunnerApi.Trigger.AfterEndOfWindow trigger) {
+    if (!trigger.hasEarlyFirings() && !trigger.hasLateFirings()) {
       return AfterWatermarkStateMachine.pastEndOfWindow();
-    }
-
-    private OnceTriggerStateMachine evaluateSpecific(NeverTrigger v) {
-      return NeverStateMachine.ever();
-    }
-
-    private OnceTriggerStateMachine 
evaluateSpecific(AfterSynchronizedProcessingTime v) {
-      return AfterSynchronizedProcessingTimeStateMachine.ofFirstElement();
-    }
-
-    private OnceTriggerStateMachine evaluateSpecific(AfterFirst v) {
-      List<OnceTriggerStateMachine> subStateMachines =
-          Lists.newArrayListWithCapacity(v.subTriggers().size());
-      for (Trigger subtrigger : v.subTriggers()) {
-        subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) 
subtrigger));
-      }
-      return AfterFirstStateMachine.of(subStateMachines);
-    }
-
-    private OnceTriggerStateMachine evaluateSpecific(AfterAll v) {
-      List<OnceTriggerStateMachine> subStateMachines =
-          Lists.newArrayListWithCapacity(v.subTriggers().size());
-      for (Trigger subtrigger : v.subTriggers()) {
-        subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) 
subtrigger));
-      }
-      return AfterAllStateMachine.of(subStateMachines);
-    }
-
-    private OnceTriggerStateMachine evaluateSpecific(AfterPane v) {
-      return AfterPaneStateMachine.elementCountAtLeast(v.getElementCount());
-    }
-
-    private TriggerStateMachine 
evaluateSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) {
+    } else {
       AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
           AfterWatermarkStateMachine.pastEndOfWindow()
-              
.withEarlyFirings(stateMachineForOnceTrigger(v.getEarlyTrigger()));
+              
.withEarlyFirings(stateMachineForTrigger(trigger.getEarlyFirings()));
 
-      if (v.getLateTrigger() != null) {
-        machine = 
machine.withLateFirings(stateMachineForOnceTrigger(v.getLateTrigger()));
+      if (trigger.hasLateFirings()) {
+        machine = 
machine.withLateFirings(stateMachineForTrigger(trigger.getLateFirings()));
       }
       return machine;
     }
+  }
 
-    private TriggerStateMachine evaluateSpecific(AfterEach v) {
-      List<TriggerStateMachine> subStateMachines =
-          Lists.newArrayListWithCapacity(v.subTriggers().size());
-
-      for (Trigger subtrigger : v.subTriggers()) {
-        subStateMachines.add(stateMachineForTrigger(subtrigger));
+  private static TriggerStateMachine stateMachineForAfterProcessingTime(
+      RunnerApi.Trigger.AfterProcessingTime trigger) {
+    AfterDelayFromFirstElementStateMachine stateMachine =
+        AfterProcessingTimeStateMachine.pastFirstElementInPane();
+    for (RunnerApi.TimestampTransform transform : 
trigger.getTimestampTransformsList()) {
+      switch (transform.getTimestampTransformCase()) {
+        case ALIGN_TO:
+          stateMachine =
+              stateMachine.alignedTo(
+                  Duration.millis(transform.getAlignTo().getPeriod()),
+                  new Instant(transform.getAlignTo().getOffset()));
+          break;
+        case DELAY:
+          stateMachine =
+              
stateMachine.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis()));
+          break;
+        case TIMESTAMPTRANSFORM_NOT_SET:
+          throw new IllegalArgumentException(
+              String.format("Required field 'timestamp_transform' not set in 
%s", transform));
+        default:
+          throw new IllegalArgumentException(
+              String.format(
+                  "Unknown timestamp transform case: %s", 
transform.getTimestampTransformCase()));
       }
-
-      return AfterEachStateMachine.inOrder(subStateMachines);
-    }
-
-    private TriggerStateMachine evaluateSpecific(Repeatedly v) {
-      return 
RepeatedlyStateMachine.forever(stateMachineForTrigger(v.getRepeatedTrigger()));
-    }
-
-    private TriggerStateMachine evaluateSpecific(OrFinallyTrigger v) {
-      return new OrFinallyStateMachine(
-          stateMachineForTrigger(v.getMainTrigger()),
-          stateMachineForOnceTrigger(v.getUntilTrigger()));
-    }
-
-    private OnceTriggerStateMachine evaluateSpecific(AfterProcessingTime v) {
-      return new AfterDelayFromFirstElementStateMachineAdapter(v);
     }
+    return stateMachine;
+  }
 
-    private static class AfterDelayFromFirstElementStateMachineAdapter
-        extends AfterDelayFromFirstElementStateMachine {
-
-      private static final Function<TimestampTransform, 
SerializableFunction<Instant, Instant>>
-          CONVERT_TIMESTAMP_TRANSFORM =
-              new Function<TimestampTransform, SerializableFunction<Instant, 
Instant>>() {
-                @Override
-                public SerializableFunction<Instant, Instant> apply(
-                    @Nonnull TimestampTransform transform) {
-                  if (transform instanceof TimestampTransform.Delay) {
-                    return new DelayFn(((TimestampTransform.Delay) 
transform).getDelay());
-                  } else if (transform instanceof TimestampTransform.AlignTo) {
-                    TimestampTransform.AlignTo alignTo = 
(TimestampTransform.AlignTo) transform;
-                    return new AlignFn(alignTo.getPeriod(), 
alignTo.getOffset());
-                  } else {
-                    throw new IllegalArgumentException(
-                        String.format(
-                            "Unknown %s: %s", 
TimestampTransform.class.getSimpleName(), transform));
-                  }
-                }
-              };
-
-      public AfterDelayFromFirstElementStateMachineAdapter(AfterProcessingTime 
v) {
-        this(
-            TimeDomain.PROCESSING_TIME,
-            FluentIterable.from(v.getTimestampTransforms())
-                .transform(CONVERT_TIMESTAMP_TRANSFORM)
-                .toList());
-      }
-
-      private AfterDelayFromFirstElementStateMachineAdapter(
-          TimeDomain timeDomain, List<SerializableFunction<Instant, Instant>> 
timestampMappers) {
-        super(timeDomain, timestampMappers);
-      }
-
-      @Override
-      public Instant getCurrentTime(TriggerContext context) {
-        switch (timeDomain) {
-          case PROCESSING_TIME:
-            return context.currentProcessingTime();
-          case SYNCHRONIZED_PROCESSING_TIME:
-            return context.currentSynchronizedProcessingTime();
-          case EVENT_TIME:
-            return context.currentEventTime();
-          default:
-            throw new IllegalArgumentException("A time domain that doesn't 
exist was received!");
-        }
-      }
-
-      @Override
-      protected AfterDelayFromFirstElementStateMachine newWith(
-          List<SerializableFunction<Instant, Instant>> transform) {
-        return new AfterDelayFromFirstElementStateMachineAdapter(timeDomain, 
transform);
-      }
+  private static List<TriggerStateMachine> stateMachinesForTriggers(
+      List<RunnerApi.Trigger> triggers) {
+    List<TriggerStateMachine> stateMachines = new ArrayList<>(triggers.size());
+    for (RunnerApi.Trigger trigger : triggers) {
+      stateMachines.add(stateMachineForTrigger(trigger));
     }
+    return stateMachines;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index dab2bf9..d18a1c3 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -60,6 +60,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AppliedCombineFn;
@@ -122,7 +123,8 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
       nonCombining(WindowingStrategy<?, W> windowingStrategy) throws Exception 
{
     return new ReduceFnTester<Integer, Iterable<Integer>, W>(
         windowingStrategy,
-        
TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger()),
+        TriggerStateMachines.stateMachineForTrigger(
+            Triggers.toProto(windowingStrategy.getTrigger())),
         SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()),
         IterableCoder.of(VarIntCoder.of()),
         PipelineOptionsFactory.create(),
@@ -186,7 +188,7 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
 
     return combining(
         strategy,
-        TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger()),
+        
TriggerStateMachines.stateMachineForTrigger(Triggers.toProto(strategy.getTrigger())),
         combineFn,
         outputCoder);
   }
@@ -236,7 +238,7 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
 
     return combining(
         strategy,
-        TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger()),
+        
TriggerStateMachines.stateMachineForTrigger(Triggers.toProto(strategy.getTrigger())),
         combineFn,
         outputCoder,
         options,

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
index 26c0597..497a3c2 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
@@ -19,21 +19,10 @@ package org.apache.beam.runners.core.triggers;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
-import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
-import org.apache.beam.sdk.transforms.windowing.AfterAll;
-import org.apache.beam.sdk.transforms.windowing.AfterEach;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
-import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.joda.time.Duration;
 import org.junit.Test;
@@ -51,49 +40,79 @@ public class TriggerStateMachinesTest {
   @Test
   public void testStateMachineForAfterPane() {
     int count = 37;
-    AfterPane trigger = AfterPane.elementCountAtLeast(count);
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            
.setElementCount(RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(count))
+            .build();
+
     AfterPaneStateMachine machine =
-        (AfterPaneStateMachine) 
TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+        (AfterPaneStateMachine) 
TriggerStateMachines.stateMachineForTrigger(trigger);
 
-    assertThat(machine.getElementCount(), equalTo(trigger.getElementCount()));
+    assertThat(machine.getElementCount(), 
equalTo(trigger.getElementCount().getElementCount()));
   }
 
+  // TODO: make these all build the proto
   @Test
   public void testStateMachineForAfterProcessingTime() {
     Duration minutes = Duration.standardMinutes(94);
     Duration hours = Duration.standardHours(13);
 
-    AfterProcessingTime trigger =
-        
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(minutes).alignedTo(hours);
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            .setAfterProcessingTime(
+                RunnerApi.Trigger.AfterProcessingTime.newBuilder()
+                    .addTimestampTransforms(
+                        RunnerApi.TimestampTransform.newBuilder()
+                            .setDelay(
+                                RunnerApi.TimestampTransform.Delay.newBuilder()
+                                    .setDelayMillis(minutes.getMillis())))
+                    .addTimestampTransforms(
+                        RunnerApi.TimestampTransform.newBuilder()
+                            .setAlignTo(
+                                
RunnerApi.TimestampTransform.AlignTo.newBuilder()
+                                    .setPeriod(hours.getMillis()))))
+            .build();
 
     AfterDelayFromFirstElementStateMachine machine =
         (AfterDelayFromFirstElementStateMachine)
-            TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+            TriggerStateMachines.stateMachineForTrigger(trigger);
 
     assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME));
   }
 
   @Test
   public void testStateMachineForAfterWatermark() {
-    AfterWatermark.FromEndOfWindow trigger = AfterWatermark.pastEndOfWindow();
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            
.setAfterEndOfWidow(RunnerApi.Trigger.AfterEndOfWindow.getDefaultInstance())
+            .build();
     AfterWatermarkStateMachine.FromEndOfWindow machine =
         (AfterWatermarkStateMachine.FromEndOfWindow)
-            TriggerStateMachines.stateMachineForOnceTrigger(trigger);
-    // No parameters, so if it doesn't crash, we win!
+            TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(
+        TriggerStateMachines.stateMachineForTrigger(trigger),
+        instanceOf(AfterWatermarkStateMachine.FromEndOfWindow.class));
   }
 
   @Test
   public void testDefaultTriggerTranslation() {
-    DefaultTrigger trigger = DefaultTrigger.of();
-    DefaultTriggerStateMachine machine =
-        (DefaultTriggerStateMachine)
-            checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger));
-    // No parameters, so if it doesn't crash, we win!
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            .setDefault(RunnerApi.Trigger.Default.getDefaultInstance())
+            .build();
+
+    assertThat(
+        TriggerStateMachines.stateMachineForTrigger(trigger),
+        instanceOf(DefaultTriggerStateMachine.class));
   }
 
   @Test
   public void testNeverTranslation() {
-    NeverTrigger trigger = Never.ever();
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            .setNever(RunnerApi.Trigger.Never.getDefaultInstance())
+            .build();
     NeverStateMachine machine =
         (NeverStateMachine) 
checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger));
     // No parameters, so if it doesn't crash, we win!
@@ -109,18 +128,35 @@ public class TriggerStateMachinesTest {
   private static final int ELEM_COUNT = 472;
   private static final Duration DELAY = Duration.standardSeconds(95673);
 
-  private final OnceTrigger subtrigger1 = 
AfterPane.elementCountAtLeast(ELEM_COUNT);
-  private final OnceTrigger subtrigger2 =
-      AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DELAY);
+  private final RunnerApi.Trigger subtrigger1 =
+      RunnerApi.Trigger.newBuilder()
+          
.setElementCount(RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(ELEM_COUNT))
+          .build();
+  private final RunnerApi.Trigger subtrigger2 =
+      RunnerApi.Trigger.newBuilder()
+          .setAfterProcessingTime(
+              RunnerApi.Trigger.AfterProcessingTime.newBuilder()
+                  .addTimestampTransforms(
+                      RunnerApi.TimestampTransform.newBuilder()
+                          .setDelay(
+                              RunnerApi.TimestampTransform.Delay.newBuilder()
+                                  .setDelayMillis(DELAY.getMillis()))))
+          .build();
 
-  private final OnceTriggerStateMachine submachine1 =
-      TriggerStateMachines.stateMachineForOnceTrigger(subtrigger1);
-  private final OnceTriggerStateMachine submachine2 =
-      TriggerStateMachines.stateMachineForOnceTrigger(subtrigger2);
+  private final TriggerStateMachine submachine1 =
+      TriggerStateMachines.stateMachineForTrigger(subtrigger1);
+  private final TriggerStateMachine submachine2 =
+      TriggerStateMachines.stateMachineForTrigger(subtrigger2);
 
   @Test
   public void testAfterEachTranslation() {
-    AfterEach trigger = AfterEach.inOrder(subtrigger1, subtrigger2);
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            .setAfterEach(
+                RunnerApi.Trigger.AfterEach.newBuilder()
+                    .addSubtriggers(subtrigger1)
+                    .addSubtriggers(subtrigger2))
+            .build();
     AfterEachStateMachine machine =
         (AfterEachStateMachine) 
TriggerStateMachines.stateMachineForTrigger(trigger);
 
@@ -129,7 +165,13 @@ public class TriggerStateMachinesTest {
 
   @Test
   public void testAfterFirstTranslation() {
-    AfterFirst trigger = AfterFirst.of(subtrigger1, subtrigger2);
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            .setAfterAny(
+                RunnerApi.Trigger.AfterAny.newBuilder()
+                    .addSubtriggers(subtrigger1)
+                    .addSubtriggers(subtrigger2))
+            .build();
     AfterFirstStateMachine machine =
         (AfterFirstStateMachine) 
TriggerStateMachines.stateMachineForTrigger(trigger);
 
@@ -138,7 +180,13 @@ public class TriggerStateMachinesTest {
 
   @Test
   public void testAfterAllTranslation() {
-    AfterAll trigger = AfterAll.of(subtrigger1, subtrigger2);
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            .setAfterAll(
+                RunnerApi.Trigger.AfterAll.newBuilder()
+                    .addSubtriggers(subtrigger1)
+                    .addSubtriggers(subtrigger2))
+            .build();
     AfterAllStateMachine machine =
         (AfterAllStateMachine) 
TriggerStateMachines.stateMachineForTrigger(trigger);
 
@@ -147,8 +195,11 @@ public class TriggerStateMachinesTest {
 
   @Test
   public void testAfterWatermarkEarlyTranslation() {
-    AfterWatermark.AfterWatermarkEarlyAndLate trigger =
-        AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1);
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            .setAfterEndOfWidow(
+                
RunnerApi.Trigger.AfterEndOfWindow.newBuilder().setEarlyFirings(subtrigger1))
+            .build();
     AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
         (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
             TriggerStateMachines.stateMachineForTrigger(trigger);
@@ -160,8 +211,13 @@ public class TriggerStateMachinesTest {
 
   @Test
   public void testAfterWatermarkEarlyLateTranslation() {
-    AfterWatermark.AfterWatermarkEarlyAndLate trigger =
-        
AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1).withLateFirings(subtrigger2);
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            .setAfterEndOfWidow(
+                RunnerApi.Trigger.AfterEndOfWindow.newBuilder()
+                    .setEarlyFirings(subtrigger1)
+                    .setLateFirings(subtrigger2))
+            .build();
     AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
         (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
             TriggerStateMachines.stateMachineForTrigger(trigger);
@@ -176,18 +232,30 @@ public class TriggerStateMachinesTest {
 
   @Test
   public void testOrFinallyTranslation() {
-    OrFinallyTrigger trigger = subtrigger1.orFinally(subtrigger2);
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            .setOrFinally(
+                RunnerApi.Trigger.OrFinally.newBuilder()
+                    .setMain(subtrigger1)
+                    .setFinally(subtrigger2))
+            .build();
     OrFinallyStateMachine machine =
-        (OrFinallyStateMachine) 
TriggerStateMachines.stateMachineForTrigger(trigger);
+        (OrFinallyStateMachine)
+            TriggerStateMachines.stateMachineForTrigger(trigger);
 
     assertThat(machine, equalTo(submachine1.orFinally(submachine2)));
   }
 
   @Test
   public void testRepeatedlyTranslation() {
-    Repeatedly trigger = Repeatedly.forever(subtrigger1);
+    RunnerApi.Trigger trigger =
+        RunnerApi.Trigger.newBuilder()
+            .setRepeat(
+                RunnerApi.Trigger.Repeat.newBuilder()
+                    .setSubtrigger(subtrigger1)).build();
     RepeatedlyStateMachine machine =
-        (RepeatedlyStateMachine) 
TriggerStateMachines.stateMachineForTrigger(trigger);
+        (RepeatedlyStateMachine)
+            TriggerStateMachines.stateMachineForTrigger(trigger);
 
     assertThat(machine, equalTo(RepeatedlyStateMachine.forever(submachine1)));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index ecf4ecd..dc0ac60 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -172,7 +173,8 @@ class GroupAlsoByWindowEvaluatorFactory implements 
TransformEvaluatorFactory {
               key,
               windowingStrategy,
               ExecutableTriggerStateMachine.create(
-                  
TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
+                  TriggerStateMachines.stateMachineForTrigger(
+                      Triggers.toProto(windowingStrategy.getTrigger()))),
               stateInternals,
               timerInternals,
               new OutputWindowedValueToBundle<>(bundle),

http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index 44d5b7c..bd37fdb 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -101,29 +102,30 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W 
extends BoundedWindow>
             key,
             windowingStrategy,
             ExecutableTriggerStateMachine.create(
-                
TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
+                TriggerStateMachines.stateMachineForTrigger(
+                    Triggers.toProto(windowingStrategy.getTrigger()))),
             stateInternals,
             timerInternals,
             outputter,
             new SideInputReader() {
-                @Override
-                public <T> T get(PCollectionView<T> view, BoundedWindow 
sideInputWindow) {
-                  throw new UnsupportedOperationException(
-                      "GroupAlsoByWindow must not have side inputs");
-                }
-
-                @Override
-                public <T> boolean contains(PCollectionView<T> view) {
-                  throw new UnsupportedOperationException(
-                      "GroupAlsoByWindow must not have side inputs");
-                }
-
-                @Override
-                public boolean isEmpty() {
-                  throw new UnsupportedOperationException(
-                      "GroupAlsoByWindow must not have side inputs");
-                }
-              },
+              @Override
+              public <T> T get(PCollectionView<T> view, BoundedWindow 
sideInputWindow) {
+                throw new UnsupportedOperationException(
+                    "GroupAlsoByWindow must not have side inputs");
+              }
+
+              @Override
+              public <T> boolean contains(PCollectionView<T> view) {
+                throw new UnsupportedOperationException(
+                    "GroupAlsoByWindow must not have side inputs");
+              }
+
+              @Override
+              public boolean isEmpty() {
+                throw new UnsupportedOperationException(
+                    "GroupAlsoByWindow must not have side inputs");
+              }
+            },
             droppedDueToClosedWindow,
             reduceFn,
             runtimeContext.getPipelineOptions());

Reply via email to