http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
new file mode 100644
index 0000000..2efaad3
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for 
the {@link Flatten}
+ * {@link PTransform}.
+ */
+class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext) {
+    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+    TransformEvaluator<InputT> evaluator = (TransformEvaluator<InputT>) 
createInMemoryEvaluator(
+            (AppliedPTransform) application, inputBundle, evaluationContext);
+    return evaluator;
+  }
+
+  private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
+      final AppliedPTransform<
+              PCollectionList<InputT>, PCollection<InputT>, 
FlattenPCollectionList<InputT>>
+          application,
+      final CommittedBundle<InputT> inputBundle,
+      final InProcessEvaluationContext evaluationContext) {
+    if (inputBundle == null) {
+      // it is impossible to call processElement on a flatten with no input 
bundle. A Flatten with
+      // no input bundle occurs as an output of 
Flatten.pcollections(PCollectionList.empty())
+      return new FlattenEvaluator<>(
+          null, StepTransformResult.withoutHold(application).build());
+    }
+    final UncommittedBundle<InputT> outputBundle =
+        evaluationContext.createBundle(inputBundle, application.getOutput());
+    final InProcessTransformResult result =
+        
StepTransformResult.withoutHold(application).addOutput(outputBundle).build();
+    return new FlattenEvaluator<>(outputBundle, result);
+  }
+
+  private static class FlattenEvaluator<InputT> implements 
TransformEvaluator<InputT> {
+    private final UncommittedBundle<InputT> outputBundle;
+    private final InProcessTransformResult result;
+
+    public FlattenEvaluator(
+        UncommittedBundle<InputT> outputBundle, InProcessTransformResult 
result) {
+      this.outputBundle = outputBundle;
+      this.result = result;
+    }
+
+    @Override
+    public void processElement(WindowedValue<InputT> element) {
+      outputBundle.add(element);
+    }
+
+    @Override
+    public InProcessTransformResult finishBundle() {
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
new file mode 100644
index 0000000..3160b58
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.TypedPValue;
+
+/**
+ * A base class for implementing {@link PTransform} overrides, which behave 
identically to the
+ * delegate transform but with overridden methods. Implementors are required 
to implement
+ * {@link #delegate()}, which returns the object to forward calls to, and 
{@link #apply(PInput)}.
+ */
+public abstract class ForwardingPTransform<InputT extends PInput, OutputT 
extends POutput>
+    extends PTransform<InputT, OutputT> {
+  protected abstract PTransform<InputT, OutputT> delegate();
+
+  @Override
+  public OutputT apply(InputT input) {
+    return delegate().apply(input);
+  }
+
+  @Override
+  public void validate(InputT input) {
+    delegate().validate(input);
+  }
+
+  @Override
+  public String getName() {
+    return delegate().getName();
+  }
+
+  @Override
+  public <T> Coder<T> getDefaultOutputCoder(InputT input, 
@SuppressWarnings("unused")
+      TypedPValue<T> output) throws CannotProvideCoderException {
+    return delegate().getDefaultOutputCoder(input, output);
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    delegate().populateDisplayData(builder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
new file mode 100644
index 0000000..874ec17
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.StepTransformResult.Builder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
+import 
org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItemCoder;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.SystemReduceFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for 
the {@link GroupByKey}
+ * {@link PTransform}.
+ */
+class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory {
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext) {
+    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+    TransformEvaluator<InputT> evaluator = createEvaluator(
+            (AppliedPTransform) application, (CommittedBundle) inputBundle, 
evaluationContext);
+    return evaluator;
+  }
+
+  private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
+      final AppliedPTransform<
+              PCollection<KV<K, WindowedValue<V>>>, 
PCollection<KeyedWorkItem<K, V>>,
+              InProcessGroupByKeyOnly<K, V>>
+          application,
+      final CommittedBundle<KV<K, V>> inputBundle,
+      final InProcessEvaluationContext evaluationContext) {
+    return new GroupByKeyEvaluator<K, V>(evaluationContext, inputBundle, 
application);
+  }
+
+  private static class GroupByKeyEvaluator<K, V>
+      implements TransformEvaluator<KV<K, WindowedValue<V>>> {
+    private final InProcessEvaluationContext evaluationContext;
+
+    private final CommittedBundle<KV<K, V>> inputBundle;
+    private final AppliedPTransform<
+            PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, 
V>>,
+            InProcessGroupByKeyOnly<K, V>>
+        application;
+    private final Coder<K> keyCoder;
+    private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
+
+    public GroupByKeyEvaluator(
+        InProcessEvaluationContext evaluationContext,
+        CommittedBundle<KV<K, V>> inputBundle,
+        AppliedPTransform<
+                PCollection<KV<K, WindowedValue<V>>>, 
PCollection<KeyedWorkItem<K, V>>,
+                InProcessGroupByKeyOnly<K, V>>
+            application) {
+      this.evaluationContext = evaluationContext;
+      this.inputBundle = inputBundle;
+      this.application = application;
+
+      PCollection<KV<K, WindowedValue<V>>> input = application.getInput();
+      keyCoder = getKeyCoder(input.getCoder());
+      groupingMap = new HashMap<>();
+    }
+
+    private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
+      if (!(coder instanceof KvCoder)) {
+        throw new IllegalStateException();
+      }
+      @SuppressWarnings("unchecked")
+      Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
+      return keyCoder;
+    }
+
+    @Override
+    public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) 
{
+      KV<K, WindowedValue<V>> kv = element.getValue();
+      K key = kv.getKey();
+      byte[] encodedKey;
+      try {
+        encodedKey = encodeToByteArray(keyCoder, key);
+      } catch (CoderException exn) {
+        // TODO: Put in better element printing:
+        // truncate if too long.
+        throw new IllegalArgumentException(
+            String.format("unable to encode key %s of input to %s using %s", 
key, this, keyCoder),
+            exn);
+      }
+      GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
+      List<WindowedValue<V>> values = groupingMap.get(groupingKey);
+      if (values == null) {
+        values = new ArrayList<WindowedValue<V>>();
+        groupingMap.put(groupingKey, values);
+      }
+      values.add(kv.getValue());
+    }
+
+    @Override
+    public InProcessTransformResult finishBundle() {
+      Builder resultBuilder = StepTransformResult.withoutHold(application);
+      for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
+          groupingMap.entrySet()) {
+        K key = groupedEntry.getKey().key;
+        KeyedWorkItem<K, V> groupedKv =
+            KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
+        UncommittedBundle<KeyedWorkItem<K, V>> bundle =
+            evaluationContext.createKeyedBundle(inputBundle, key, 
application.getOutput());
+        bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
+        resultBuilder.addOutput(bundle);
+      }
+      return resultBuilder.build();
+    }
+
+    private static class GroupingKey<K> {
+      private K key;
+      private byte[] encodedKey;
+
+      public GroupingKey(K key, byte[] encodedKey) {
+        this.key = key;
+        this.encodedKey = encodedKey;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (o instanceof GroupingKey) {
+          GroupingKey<?> that = (GroupingKey<?>) o;
+          return Arrays.equals(this.encodedKey, that.encodedKey);
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public int hashCode() {
+        return Arrays.hashCode(encodedKey);
+      }
+    }
+  }
+
+  /**
+   * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
+   */
+  public static final class InProcessGroupByKeyOverrideFactory
+      implements PTransformOverrideFactory {
+    @Override
+    public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, 
OutputT> override(
+        PTransform<InputT, OutputT> transform) {
+      if (transform instanceof GroupByKey) {
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        PTransform<InputT, OutputT> override = new 
InProcessGroupByKey((GroupByKey) transform);
+        return override;
+      }
+      return transform;
+    }
+  }
+
+  /**
+   * An in-memory implementation of the {@link GroupByKey} primitive as a 
composite
+   * {@link PTransform}.
+   */
+  private static final class InProcessGroupByKey<K, V>
+      extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
+    private final GroupByKey<K, V> original;
+
+    private InProcessGroupByKey(GroupByKey<K, V> from) {
+      this.original = from;
+    }
+
+    @Override
+    public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> 
delegate() {
+      return original;
+    }
+
+    @Override
+    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+      KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
+
+      // This operation groups by the combination of key and window,
+      // merging windows as needed, using the windows assigned to the
+      // key/value input elements and the window merge operation of the
+      // window function associated with the input PCollection.
+      WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+      // Use the default GroupAlsoByWindow implementation
+      DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow =
+          groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder());
+
+      // By default, implement GroupByKey via a series of lower-level 
operations.
+      return input
+          // Make each input element's timestamp and assigned windows
+          // explicit, in the value part.
+          .apply(new ReifyTimestampsAndWindows<K, V>())
+
+          .apply(new InProcessGroupByKeyOnly<K, V>())
+          .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(),
+              inputCoder.getValueCoder(), 
input.getWindowingStrategy().getWindowFn().windowCoder()))
+
+          // Group each key's values by window, merging windows as needed.
+          .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow))
+
+          // And update the windowing strategy as appropriate.
+          
.setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
+          .setCoder(
+              KvCoder.of(inputCoder.getKeyCoder(), 
IterableCoder.of(inputCoder.getValueCoder())));
+    }
+
+    private <W extends BoundedWindow>
+        DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow(
+            final WindowingStrategy<?, W> windowingStrategy, final Coder<V> 
inputCoder) {
+      return GroupAlsoByWindowViaWindowSetDoFn.create(
+          windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
+    }
+  }
+
+  /**
+   * An implementation primitive to use in the evaluation of a {@link 
GroupByKey}
+   * {@link PTransform}.
+   */
+  public static final class InProcessGroupByKeyOnly<K, V>
+      extends PTransform<PCollection<KV<K, WindowedValue<V>>>, 
PCollection<KeyedWorkItem<K, V>>> {
+    @Override
+    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, 
WindowedValue<V>>> input) {
+      return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded());
+    }
+
+    @VisibleForTesting
+    InProcessGroupByKeyOnly() {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
new file mode 100644
index 0000000..2103ad3
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.IllegalMutationException;
+import org.apache.beam.sdk.util.MutationDetector;
+import org.apache.beam.sdk.util.MutationDetectors;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.api.client.util.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+
+import org.joda.time.Instant;
+
+/**
+ * A {@link BundleFactory} that ensures that elements added to it are not 
mutated after being
+ * output. Immutability checks are enforced at the time {@link 
UncommittedBundle#commit(Instant)} is
+ * called, checking the value at that time against the value at the time the 
element was added. All
+ * elements added to the bundle will be encoded by the {@link Coder} of the 
underlying
+ * {@link PCollection}.
+ *
+ * <p>This catches errors during the execution of a {@link DoFn} caused by 
modifying an element
+ * after it is added to an output {@link PCollection}.
+ */
+class ImmutabilityCheckingBundleFactory implements BundleFactory {
+  /**
+   * Create a new {@link ImmutabilityCheckingBundleFactory} that uses the 
underlying
+   * {@link BundleFactory} to create the output bundle.
+   */
+  public static ImmutabilityCheckingBundleFactory create(BundleFactory 
underlying) {
+    return new ImmutabilityCheckingBundleFactory(underlying);
+  }
+
+  private final BundleFactory underlying;
+
+  private ImmutabilityCheckingBundleFactory(BundleFactory underlying) {
+    this.underlying = checkNotNull(underlying);
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
+    return new 
ImmutabilityEnforcingBundle<>(underlying.createRootBundle(output));
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, 
PCollection<T> output) {
+    return new ImmutabilityEnforcingBundle<>(underlying.createBundle(input, 
output));
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, Object key, PCollection<T> output) {
+    return new 
ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output));
+  }
+
+  private static class ImmutabilityEnforcingBundle<T> implements 
UncommittedBundle<T> {
+    private final UncommittedBundle<T> underlying;
+    private final SetMultimap<WindowedValue<T>, MutationDetector> 
mutationDetectors;
+    private Coder<T> coder;
+
+    public ImmutabilityEnforcingBundle(UncommittedBundle<T> underlying) {
+      this.underlying = underlying;
+      mutationDetectors = HashMultimap.create();
+      coder = getPCollection().getCoder();
+    }
+
+    @Override
+    public PCollection<T> getPCollection() {
+      return underlying.getPCollection();
+    }
+
+    @Override
+    public UncommittedBundle<T> add(WindowedValue<T> element) {
+      try {
+        mutationDetectors.put(
+            element, MutationDetectors.forValueWithCoder(element.getValue(), 
coder));
+      } catch (CoderException e) {
+        throw Throwables.propagate(e);
+      }
+      underlying.add(element);
+      return this;
+    }
+
+    @Override
+    public CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
+      for (MutationDetector detector : mutationDetectors.values()) {
+        try {
+          detector.verifyUnmodified();
+        } catch (IllegalMutationException exn) {
+          throw UserCodeException.wrap(
+              new IllegalMutationException(
+                  String.format(
+                      "PTransform %s mutated value %s after it was output (new 
value was %s)."
+                          + " Values must not be mutated in any way after 
being output.",
+                      
underlying.getPCollection().getProducingTransformInternal().getFullName(),
+                      exn.getSavedValue(),
+                      exn.getNewValue()),
+                  exn.getSavedValue(),
+                  exn.getNewValue(),
+                  exn));
+        }
+      }
+      return underlying.commit(synchronizedProcessingTime);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
new file mode 100644
index 0000000..bfecc9d
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.util.IllegalMutationException;
+import org.apache.beam.sdk.util.MutationDetector;
+import org.apache.beam.sdk.util.MutationDetectors;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import java.util.IdentityHashMap;
+import java.util.Map;
+
+/**
+ * {@link ModelEnforcement} that enforces elements are not modified over the 
course of processing
+ * an element.
+ *
+ * <p>Implies {@link EncodabilityEnforcment}.
+ */
+class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
+  public static ModelEnforcementFactory create() {
+    return new ImmutabilityEnforcementFactory();
+  }
+
+  @Override
+  public <T> ModelEnforcement<T> forBundle(
+      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+    return new ImmutabilityCheckingEnforcement<T>(input, consumer);
+  }
+
+  private static class ImmutabilityCheckingEnforcement<T> extends 
AbstractModelEnforcement<T> {
+    private final AppliedPTransform<?, ?, ?> transform;
+    private final Map<WindowedValue<T>, MutationDetector> mutationElements;
+    private final Coder<T> coder;
+
+    private ImmutabilityCheckingEnforcement(
+        CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) {
+      this.transform = transform;
+      coder = input.getPCollection().getCoder();
+      mutationElements = new IdentityHashMap<>();
+    }
+
+    @Override
+    public void beforeElement(WindowedValue<T> element) {
+      try {
+        mutationElements.put(
+            element, MutationDetectors.forValueWithCoder(element.getValue(), 
coder));
+      } catch (CoderException e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+
+    @Override
+    public void afterElement(WindowedValue<T> element) {
+      verifyUnmodified(mutationElements.get(element));
+    }
+
+    @Override
+    public void afterFinish(
+        CommittedBundle<T> input,
+        InProcessTransformResult result,
+        Iterable<? extends CommittedBundle<?>> outputs) {
+      for (MutationDetector detector : mutationElements.values()) {
+        verifyUnmodified(detector);
+      }
+    }
+
+    private void verifyUnmodified(MutationDetector detector) {
+      try {
+        detector.verifyUnmodified();
+      } catch (IllegalMutationException e) {
+        throw new IllegalMutationException(
+            String.format(
+                "PTransform %s illegaly mutated value %s of class %s."
+                    + " Input values must not be mutated in any way.",
+                transform.getFullName(),
+                e.getSavedValue(),
+                e.getSavedValue().getClass()),
+            e.getSavedValue(),
+            e.getNewValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
new file mode 100644
index 0000000..07b6bb4
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -0,0 +1,1327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.TreeMultiset;
+
+import org.joda.time.Instant;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+/**
+ * Manages watermarks of {@link PCollection PCollections} and input and output 
watermarks of
+ * {@link AppliedPTransform AppliedPTransforms} to provide event-time and 
completion tracking for
+ * in-memory execution. {@link InMemoryWatermarkManager} is designed to update 
and return a
+ * consistent view of watermarks in the presence of concurrent updates.
+ *
+ * <p>An {@link InMemoryWatermarkManager} is provided with the collection of 
root
+ * {@link AppliedPTransform AppliedPTransforms} and a map of {@link 
PCollection PCollections} to
+ * all the {@link AppliedPTransform AppliedPTransforms} that consume them at 
construction time.
+ *
+ * <p>Whenever a root {@link AppliedPTransform transform} produces elements, 
the
+ * {@link InMemoryWatermarkManager} is provided with the produced elements and 
the output watermark
+ * of the producing {@link AppliedPTransform transform}. The
+ * {@link InMemoryWatermarkManager watermark manager} is responsible for 
computing the watermarks
+ * of all {@link AppliedPTransform transforms} that consume one or more
+ * {@link PCollection PCollections}.
+ *
+ * <p>Whenever a non-root {@link AppliedPTransform} finishes processing one or 
more in-flight
+ * elements (referred to as the input {@link CommittedBundle bundle}), the 
following occurs
+ * atomically:
+ * <ul>
+ *  <li>All of the in-flight elements are removed from the collection of 
pending elements for the
+ *      {@link AppliedPTransform}.</li>
+ *  <li>All of the elements produced by the {@link AppliedPTransform} are 
added to the collection
+ *      of pending elements for each {@link AppliedPTransform} that consumes 
them.</li>
+ *  <li>The input watermark for the {@link AppliedPTransform} becomes the 
maximum value of
+ *    <ul>
+ *      <li>the previous input watermark</li>
+ *      <li>the minimum of
+ *        <ul>
+ *          <li>the timestamps of all currently pending elements</li>
+ *          <li>all input {@link PCollection} watermarks</li>
+ *        </ul>
+ *      </li>
+ *    </ul>
+ *  </li>
+ *  <li>The output watermark for the {@link AppliedPTransform} becomes the 
maximum of
+ *    <ul>
+ *      <li>the previous output watermark</li>
+ *      <li>the minimum of
+ *        <ul>
+ *          <li>the current input watermark</li>
+ *          <li>the current watermark holds</li>
+ *        </ul>
+ *      </li>
+ *    </ul>
+ *  </li>
+ *  <li>The watermark of the output {@link PCollection} can be advanced to the 
output watermark of
+ *      the {@link AppliedPTransform}</li>
+ *  <li>The watermark of all downstream {@link AppliedPTransform 
AppliedPTransforms} can be
+ *      advanced.</li>
+ * </ul>
+ *
+ * <p>The watermark of a {@link PCollection} is equal to the output watermark 
of the
+ * {@link AppliedPTransform} that produces it.
+ *
+ * <p>The watermarks for a {@link PTransform} are updated as follows when 
output is committed:<pre>
+ * Watermark_In'  = MAX(Watermark_In, MIN(U(TS_Pending), 
U(Watermark_InputPCollection)))
+ * Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(StateHold)))
+ * Watermark_PCollection = Watermark_Out_ProducingPTransform
+ * </pre>
+ */
+public class InMemoryWatermarkManager {
+  /**
+   * The watermark of some {@link Pipeline} element, usually a {@link 
PTransform} or a
+   * {@link PCollection}.
+   *
+   * <p>A watermark is a monotonically increasing value, which represents the 
point up to which the
+   * system believes it has received all of the data. Data that arrives with a 
timestamp that is
+   * before the watermark is considered late. {@link 
BoundedWindow#TIMESTAMP_MAX_VALUE} is a special
+   * timestamp which indicates we have received all of the data and there will 
be no more on-time or
+   * late data. This value is represented by {@link 
InMemoryWatermarkManager#THE_END_OF_TIME}.
+   */
+  private static interface Watermark {
+    /**
+     * Returns the current value of this watermark.
+     */
+    Instant get();
+
+    /**
+     * Refreshes the value of this watermark from its input watermarks and 
watermark holds.
+     *
+     * @return true if the value of the watermark has changed (and thus 
dependent watermark must
+     *         also be updated
+     */
+    WatermarkUpdate refresh();
+  }
+
+  /**
+   * The result of computing a {@link Watermark}.
+   */
+  private static enum WatermarkUpdate {
+    /** The watermark is later than the value at the previous time it was 
computed. */
+    ADVANCED(true),
+    /** The watermark is equal to the value at the previous time it was 
computed. */
+    NO_CHANGE(false);
+
+    private final boolean advanced;
+
+    private WatermarkUpdate(boolean advanced) {
+      this.advanced = advanced;
+    }
+
+    public boolean isAdvanced() {
+      return advanced;
+    }
+
+    /**
+     * Returns the {@link WatermarkUpdate} that is a result of combining the 
two watermark updates.
+     *
+     * If either of the input {@link WatermarkUpdate WatermarkUpdates} were 
advanced, the result
+     * {@link WatermarkUpdate} has been advanced.
+     */
+    public WatermarkUpdate union(WatermarkUpdate that) {
+      if (this.advanced) {
+        return this;
+      }
+      return that;
+    }
+
+    /**
+     * Returns the {@link WatermarkUpdate} based on the former and current
+     * {@link Instant timestamps}.
+     */
+    public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant 
currentTime) {
+      if (currentTime.isAfter(oldTime)) {
+        return ADVANCED;
+      }
+      return NO_CHANGE;
+    }
+  }
+
+  /**
+   * The input {@link Watermark} of an {@link AppliedPTransform}.
+   *
+   * <p>At any point, the value of an {@link AppliedPTransformInputWatermark} 
is equal to the
+   * minimum watermark across all of its input {@link Watermark Watermarks}, 
and the minimum
+   * timestamp of all of the pending elements, restricted to be monotonically 
increasing.
+   *
+   * <p>See {@link #refresh()} for more information.
+   */
+  private static class AppliedPTransformInputWatermark implements Watermark {
+    private final Collection<? extends Watermark> inputWatermarks;
+    private final SortedMultiset<WindowedValue<?>> pendingElements;
+    private final Map<Object, NavigableSet<TimerData>> objectTimers;
+
+    private AtomicReference<Instant> currentWatermark;
+
+    public AppliedPTransformInputWatermark(Collection<? extends Watermark> 
inputWatermarks) {
+      this.inputWatermarks = inputWatermarks;
+      this.pendingElements = TreeMultiset.create(new 
WindowedValueByTimestampComparator());
+      this.objectTimers = new HashMap<>();
+      currentWatermark = new 
AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    @Override
+    public Instant get() {
+      return currentWatermark.get();
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>When refresh is called, the value of the {@link 
AppliedPTransformInputWatermark} becomes
+     * equal to the maximum value of
+     * <ul>
+     *   <li>the previous input watermark</li>
+     *   <li>the minimum of
+     *     <ul>
+     *       <li>the timestamps of all currently pending elements</li>
+     *       <li>all input {@link PCollection} watermarks</li>
+     *     </ul>
+     *   </li>
+     * </ul>
+     */
+    @Override
+    public synchronized WatermarkUpdate refresh() {
+      Instant oldWatermark = currentWatermark.get();
+      Instant minInputWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      for (Watermark inputWatermark : inputWatermarks) {
+        minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, 
inputWatermark.get());
+      }
+      if (!pendingElements.isEmpty()) {
+        minInputWatermark = INSTANT_ORDERING.min(
+            minInputWatermark, 
pendingElements.firstEntry().getElement().getTimestamp());
+      }
+      Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, 
minInputWatermark);
+      currentWatermark.set(newWatermark);
+      return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
+    }
+
+    private synchronized void addPendingElements(Iterable<? extends 
WindowedValue<?>> newPending) {
+      for (WindowedValue<?> pendingElement : newPending) {
+        pendingElements.add(pendingElement);
+      }
+    }
+
+    private synchronized void removePendingElements(
+        Iterable<? extends WindowedValue<?>> finishedElements) {
+      for (WindowedValue<?> finishedElement : finishedElements) {
+        pendingElements.remove(finishedElement);
+      }
+    }
+
+    private synchronized void updateTimers(TimerUpdate update) {
+      NavigableSet<TimerData> keyTimers = objectTimers.get(update.key);
+      if (keyTimers == null) {
+        keyTimers = new TreeSet<>();
+        objectTimers.put(update.key, keyTimers);
+      }
+      for (TimerData timer : update.setTimers) {
+        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
+          keyTimers.add(timer);
+        }
+      }
+      for (TimerData timer : update.deletedTimers) {
+        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
+          keyTimers.remove(timer);
+        }
+      }
+      // We don't keep references to timers that have been fired and delivered 
via #getFiredTimers()
+    }
+
+    private synchronized Map<Object, List<TimerData>> 
extractFiredEventTimeTimers() {
+      return extractFiredTimers(currentWatermark.get(), objectTimers);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class)
+          .add("pendingElements", pendingElements)
+          .add("currentWatermark", currentWatermark)
+          .toString();
+    }
+  }
+
+  /**
+   * The output {@link Watermark} of an {@link AppliedPTransform}.
+   *
+   * <p>The value of an {@link AppliedPTransformOutputWatermark} is equal to 
the minimum of the
+   * current watermark hold and the {@link AppliedPTransformInputWatermark} 
for the same
+   * {@link AppliedPTransform}, restricted to be monotonically increasing. See
+   * {@link #refresh()} for more information.
+   */
+  private static class AppliedPTransformOutputWatermark implements Watermark {
+    private final Watermark inputWatermark;
+    private final PerKeyHolds holds;
+    private AtomicReference<Instant> currentWatermark;
+
+    public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark 
inputWatermark) {
+      this.inputWatermark = inputWatermark;
+      holds = new PerKeyHolds();
+      currentWatermark = new 
AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    public synchronized void updateHold(Object key, Instant newHold) {
+      if (newHold == null) {
+        holds.removeHold(key);
+      } else {
+        holds.updateHold(key, newHold);
+      }
+    }
+
+    @Override
+    public Instant get() {
+      return currentWatermark.get();
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>When refresh is called, the value of the {@link 
AppliedPTransformOutputWatermark} becomes
+     * equal to the maximum value of:
+     * <ul>
+     *   <li>the previous output watermark</li>
+     *   <li>the minimum of
+     *     <ul>
+     *       <li>the current input watermark</li>
+     *       <li>the current watermark holds</li>
+     *     </ul>
+     *   </li>
+     * </ul>
+     */
+    @Override
+    public synchronized WatermarkUpdate refresh() {
+      Instant oldWatermark = currentWatermark.get();
+      Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), 
holds.getMinHold());
+      newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
+      currentWatermark.set(newWatermark);
+      return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class)
+          .add("holds", holds)
+          .add("currentWatermark", currentWatermark)
+          .toString();
+    }
+  }
+
+  /**
+   * The input {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
+   * {@link AppliedPTransform}.
+   *
+   * <p>At any point, the hold value of an {@link 
SynchronizedProcessingTimeInputWatermark} is equal
+   * to the minimum across all pending bundles at the {@link 
AppliedPTransform} and all upstream
+   * {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of 
the input
+   * synchronized processing time at any step is equal to the maximum of:
+   * <ul>
+   *   <li>The most recently returned synchronized processing input time
+   *   <li>The minimum of
+   *     <ul>
+   *       <li>The current processing time
+   *       <li>The current synchronized processing time input hold
+   *     </ul>
+   * </ul>
+   */
+  private static class SynchronizedProcessingTimeInputWatermark implements 
Watermark {
+    private final Collection<? extends Watermark> inputWms;
+    private final Collection<CommittedBundle<?>> pendingBundles;
+    private final Map<Object, NavigableSet<TimerData>> processingTimers;
+    private final Map<Object, NavigableSet<TimerData>> 
synchronizedProcessingTimers;
+
+    private final PriorityQueue<TimerData> pendingTimers;
+
+    private AtomicReference<Instant> earliestHold;
+
+    public SynchronizedProcessingTimeInputWatermark(Collection<? extends 
Watermark> inputWms) {
+      this.inputWms = inputWms;
+      this.pendingBundles = new HashSet<>();
+      this.processingTimers = new HashMap<>();
+      this.synchronizedProcessingTimers = new HashMap<>();
+      this.pendingTimers = new PriorityQueue<>();
+      Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      for (Watermark wm : inputWms) {
+        initialHold = INSTANT_ORDERING.min(initialHold, wm.get());
+      }
+      earliestHold = new AtomicReference<>(initialHold);
+    }
+
+    @Override
+    public Instant get() {
+      return earliestHold.get();
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>When refresh is called, the value of the {@link 
SynchronizedProcessingTimeInputWatermark}
+     * becomes equal to the minimum value of
+     * <ul>
+     *   <li>the timestamps of all currently pending bundles</li>
+     *   <li>all input {@link PCollection} synchronized processing time 
watermarks</li>
+     * </ul>
+     *
+     * <p>Note that this value is not monotonic, but the returned value for 
the synchronized
+     * processing time must be.
+     */
+    @Override
+    public synchronized WatermarkUpdate refresh() {
+      Instant oldHold = earliestHold.get();
+      Instant minTime = THE_END_OF_TIME.get();
+      for (Watermark input : inputWms) {
+        minTime = INSTANT_ORDERING.min(minTime, input.get());
+      }
+      for (CommittedBundle<?> bundle : pendingBundles) {
+        // TODO: Track elements in the bundle by the processing time they were 
output instead of
+        // entire bundles. Requried to support arbitrarily splitting and 
merging bundles between
+        // steps
+        minTime = INSTANT_ORDERING.min(minTime, 
bundle.getSynchronizedProcessingOutputWatermark());
+      }
+      earliestHold.set(minTime);
+      return WatermarkUpdate.fromTimestamps(oldHold, minTime);
+    }
+
+    public synchronized void addPending(CommittedBundle<?> bundle) {
+      pendingBundles.add(bundle);
+    }
+
+    public synchronized void removePending(CommittedBundle<?> bundle) {
+      pendingBundles.remove(bundle);
+    }
+
+    /**
+     * Return the earliest timestamp of the earliest timer that has not been 
completed. This is
+     * either the earliest timestamp across timers that have not been 
completed, or the earliest
+     * timestamp across timers that have been delivered but have not been 
completed.
+     */
+    public synchronized Instant getEarliestTimerTimestamp() {
+      Instant earliest = THE_END_OF_TIME.get();
+      for (NavigableSet<TimerData> timers : processingTimers.values()) {
+        if (!timers.isEmpty()) {
+          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), 
earliest);
+        }
+      }
+      for (NavigableSet<TimerData> timers : 
synchronizedProcessingTimers.values()) {
+        if (!timers.isEmpty()) {
+          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), 
earliest);
+        }
+      }
+      if (!pendingTimers.isEmpty()) {
+        earliest = INSTANT_ORDERING.min(pendingTimers.peek().getTimestamp(), 
earliest);
+      }
+      return earliest;
+    }
+
+    private synchronized void updateTimers(TimerUpdate update) {
+      for (TimerData completedTimer : update.completedTimers) {
+        pendingTimers.remove(completedTimer);
+      }
+      Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
+      for (TimerData addedTimer : update.setTimers) {
+        NavigableSet<TimerData> timerQueue = 
timerMap.get(addedTimer.getDomain());
+        if (timerQueue != null) {
+          timerQueue.add(addedTimer);
+        }
+      }
+      for (TimerData deletedTimer : update.deletedTimers) {
+        NavigableSet<TimerData> timerQueue = 
timerMap.get(deletedTimer.getDomain());
+        if (timerQueue != null) {
+          timerQueue.remove(deletedTimer);
+        }
+      }
+    }
+
+    private synchronized Map<Object, List<TimerData>> extractFiredDomainTimers(
+        TimeDomain domain, Instant firingTime) {
+      Map<Object, List<TimerData>> firedTimers;
+      switch (domain) {
+        case PROCESSING_TIME:
+          firedTimers = extractFiredTimers(firingTime, processingTimers);
+          break;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          firedTimers =
+              extractFiredTimers(
+                  INSTANT_ORDERING.min(firingTime, earliestHold.get()),
+                  synchronizedProcessingTimers);
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Called getFiredTimers on a Synchronized Processing Time 
watermark"
+                  + " and gave a non-processing time domain "
+                  + domain);
+      }
+      for (Map.Entry<Object, ? extends Collection<TimerData>> firedTimer : 
firedTimers.entrySet()) {
+        pendingTimers.addAll(firedTimer.getValue());
+      }
+      return firedTimers;
+    }
+
+    private Map<TimeDomain, NavigableSet<TimerData>> timerMap(Object key) {
+      NavigableSet<TimerData> processingQueue = processingTimers.get(key);
+      if (processingQueue == null) {
+        processingQueue = new TreeSet<>();
+        processingTimers.put(key, processingQueue);
+      }
+      NavigableSet<TimerData> synchronizedProcessingQueue =
+          synchronizedProcessingTimers.get(key);
+      if (synchronizedProcessingQueue == null) {
+        synchronizedProcessingQueue = new TreeSet<>();
+        synchronizedProcessingTimers.put(key, synchronizedProcessingQueue);
+      }
+      EnumMap<TimeDomain, NavigableSet<TimerData>> result = new 
EnumMap<>(TimeDomain.class);
+      result.put(TimeDomain.PROCESSING_TIME, processingQueue);
+      result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, 
synchronizedProcessingQueue);
+      return result;
+    }
+
+    @Override
+    public synchronized String toString() {
+      return 
MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class)
+          .add("earliestHold", earliestHold)
+          .toString();
+    }
+  }
+
+  /**
+   * The output {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
+   * {@link AppliedPTransform}.
+   *
+   * <p>At any point, the hold value of an {@link 
SynchronizedProcessingTimeOutputWatermark} is
+   * equal to the minimum across all incomplete timers at the {@link 
AppliedPTransform} and all
+   * upstream {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The 
value of the output
+   * synchronized processing time at any step is equal to the maximum of:
+   * <ul>
+   *   <li>The most recently returned synchronized processing output time
+   *   <li>The minimum of
+   *     <ul>
+   *       <li>The current processing time
+   *       <li>The current synchronized processing time output hold
+   *     </ul>
+   * </ul>
+   */
+  private static class SynchronizedProcessingTimeOutputWatermark implements 
Watermark {
+    private final SynchronizedProcessingTimeInputWatermark inputWm;
+    private AtomicReference<Instant> latestRefresh;
+
+    public SynchronizedProcessingTimeOutputWatermark(
+        SynchronizedProcessingTimeInputWatermark inputWm) {
+      this.inputWm = inputWm;
+      this.latestRefresh = new 
AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    @Override
+    public Instant get() {
+      return latestRefresh.get();
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>When refresh is called, the value of the {@link 
SynchronizedProcessingTimeOutputWatermark}
+     * becomes equal to the minimum value of:
+     * <ul>
+     *   <li>the current input watermark.
+     *   <li>all {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} timers that 
are based on the input
+     *       watermark.
+     *   <li>all {@link TimeDomain#PROCESSING_TIME} timers that are based on 
the input watermark.
+     * </ul>
+     *
+     * <p>Note that this value is not monotonic, but the returned value for 
the synchronized
+     * processing time must be.
+     */
+    @Override
+    public synchronized WatermarkUpdate refresh() {
+      // Hold the output synchronized processing time to the input watermark, 
which takes into
+      // account buffered bundles, and the earliest pending timer, which 
determines what to hold
+      // downstream timers to.
+      Instant oldRefresh = latestRefresh.get();
+      Instant newTimestamp =
+          INSTANT_ORDERING.min(inputWm.get(), 
inputWm.getEarliestTimerTimestamp());
+      latestRefresh.set(newTimestamp);
+      return WatermarkUpdate.fromTimestamps(oldRefresh, newTimestamp);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return 
MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class)
+          .add("latestRefresh", latestRefresh)
+          .toString();
+    }
+  }
+
+  /**
+   * The {@code Watermark} that is after the latest time it is possible to 
represent in the global
+   * window. This is a distinguished value representing a complete {@link 
PTransform}.
+   */
+  private static final Watermark THE_END_OF_TIME = new Watermark() {
+        @Override
+        public WatermarkUpdate refresh() {
+          // THE_END_OF_TIME is a distinguished value that cannot be advanced.
+          return WatermarkUpdate.NO_CHANGE;
+        }
+
+        @Override
+        public Instant get() {
+          return BoundedWindow.TIMESTAMP_MAX_VALUE;
+        }
+      };
+
+  private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
+
+  /**
+   * A function that takes a WindowedValue and returns the exploded 
representation of that
+   * {@link WindowedValue}.
+   */
+  private static final Function<WindowedValue<?>, ? extends Iterable<? extends 
WindowedValue<?>>>
+      EXPLODE_WINDOWS_FN =
+          new Function<WindowedValue<?>, Iterable<? extends 
WindowedValue<?>>>() {
+            @Override
+            public Iterable<? extends WindowedValue<?>> apply(WindowedValue<?> 
input) {
+              return input.explodeWindows();
+            }
+          };
+
+  /**
+   * For each (Object, PriorityQueue) pair in the provided map, remove each 
Timer that is before the
+   * latestTime argument and put in in the result with the same key, then 
remove all of the keys
+   * which have no more pending timers.
+   *
+   * The result collection retains ordering of timers (from earliest to 
latest).
+   */
+  private static Map<Object, List<TimerData>> extractFiredTimers(
+      Instant latestTime, Map<Object, NavigableSet<TimerData>> objectTimers) {
+    Map<Object, List<TimerData>> result = new HashMap<>();
+    Set<Object> emptyKeys = new HashSet<>();
+    for (Map.Entry<Object, NavigableSet<TimerData>> pendingTimers : 
objectTimers.entrySet()) {
+      NavigableSet<TimerData> timers = pendingTimers.getValue();
+      if (!timers.isEmpty() && 
timers.first().getTimestamp().isBefore(latestTime)) {
+        ArrayList<TimerData> keyFiredTimers = new ArrayList<>();
+        result.put(pendingTimers.getKey(), keyFiredTimers);
+        while (!timers.isEmpty() && 
timers.first().getTimestamp().isBefore(latestTime)) {
+          keyFiredTimers.add(timers.first());
+          timers.remove(timers.first());
+        }
+      }
+      if (timers.isEmpty()) {
+        emptyKeys.add(pendingTimers.getKey());
+      }
+    }
+    objectTimers.keySet().removeAll(emptyKeys);
+    return result;
+  }
+
+  
////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * The {@link Clock} providing the current time in the {@link 
TimeDomain#PROCESSING_TIME} domain.
+   */
+  private final Clock clock;
+
+  /**
+   * A map from each {@link PCollection} to all {@link AppliedPTransform 
PTransform applications}
+   * that consume that {@link PCollection}.
+   */
+  private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers;
+
+  /**
+   * The input and output watermark of each {@link AppliedPTransform}.
+   */
+  private final Map<AppliedPTransform<?, ?, ?>, TransformWatermarks> 
transformToWatermarks;
+
+  /**
+   * Creates a new {@link InMemoryWatermarkManager}. All watermarks within the 
newly created
+   * {@link InMemoryWatermarkManager} start at {@link 
BoundedWindow#TIMESTAMP_MIN_VALUE}, the
+   * minimum watermark, with no watermark holds or pending elements.
+   *
+   * @param rootTransforms the root-level transforms of the {@link Pipeline}
+   * @param consumers a mapping between each {@link PCollection} in the {@link 
Pipeline} to the
+   *                  transforms that consume it as a part of their input
+   */
+  public static InMemoryWatermarkManager create(
+      Clock clock,
+      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
+    return new InMemoryWatermarkManager(clock, rootTransforms, consumers);
+  }
+
+  private InMemoryWatermarkManager(
+      Clock clock,
+      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
+    this.clock = clock;
+    this.consumers = consumers;
+
+    transformToWatermarks = new HashMap<>();
+
+    for (AppliedPTransform<?, ?, ?> rootTransform : rootTransforms) {
+      getTransformWatermark(rootTransform);
+    }
+    for (Collection<AppliedPTransform<?, ?, ?>> intermediateTransforms : 
consumers.values()) {
+      for (AppliedPTransform<?, ?, ?> transform : intermediateTransforms) {
+        getTransformWatermark(transform);
+      }
+    }
+  }
+
+  private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> 
transform) {
+    TransformWatermarks wms = transformToWatermarks.get(transform);
+    if (wms == null) {
+      List<Watermark> inputCollectionWatermarks = 
getInputWatermarks(transform);
+      AppliedPTransformInputWatermark inputWatermark =
+          new AppliedPTransformInputWatermark(inputCollectionWatermarks);
+      AppliedPTransformOutputWatermark outputWatermark =
+          new AppliedPTransformOutputWatermark(inputWatermark);
+
+      SynchronizedProcessingTimeInputWatermark inputProcessingWatermark =
+          new 
SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(transform));
+      SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark =
+          new 
SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark);
+
+      wms =
+          new TransformWatermarks(
+              inputWatermark, outputWatermark, inputProcessingWatermark, 
outputProcessingWatermark);
+      transformToWatermarks.put(transform, wms);
+    }
+    return wms;
+  }
+
+  private Collection<Watermark> getInputProcessingWatermarks(
+      AppliedPTransform<?, ?, ?> transform) {
+    ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
+    Collection<? extends PValue> inputs = transform.getInput().expand();
+    if (inputs.isEmpty()) {
+      inputWmsBuilder.add(THE_END_OF_TIME);
+    }
+    for (PValue pvalue : inputs) {
+      Watermark producerOutputWatermark =
+          getTransformWatermark(pvalue.getProducingTransformInternal())
+              .synchronizedProcessingOutputWatermark;
+      inputWmsBuilder.add(producerOutputWatermark);
+    }
+    return inputWmsBuilder.build();
+  }
+
+  private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> 
transform) {
+    ImmutableList.Builder<Watermark> inputWatermarksBuilder = 
ImmutableList.builder();
+    Collection<? extends PValue> inputs = transform.getInput().expand();
+    if (inputs.isEmpty()) {
+      inputWatermarksBuilder.add(THE_END_OF_TIME);
+    }
+    for (PValue pvalue : inputs) {
+      Watermark producerOutputWatermark =
+          
getTransformWatermark(pvalue.getProducingTransformInternal()).outputWatermark;
+      inputWatermarksBuilder.add(producerOutputWatermark);
+    }
+    List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
+    return inputCollectionWatermarks;
+  }
+
+  
////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Gets the input and output watermarks for an {@link AppliedPTransform}. If 
the
+   * {@link AppliedPTransform PTransform} has not processed any elements, 
return a watermark of
+   * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
+   *
+   * @return a snapshot of the input watermark and output watermark for the 
provided transform
+   */
+  public TransformWatermarks getWatermarks(AppliedPTransform<?, ?, ?> 
transform) {
+    return transformToWatermarks.get(transform);
+  }
+
+  /**
+   * Updates the watermarks of a transform with one or more inputs.
+   *
+   * <p>Each transform has two monotonically increasing watermarks: the input 
watermark, which can,
+   * at any time, be updated to equal:
+   * <pre>
+   * MAX(CurrentInputWatermark, MIN(PendingElements, 
InputPCollectionWatermarks))
+   * </pre>
+   * and the output watermark, which can, at any time, be updated to equal:
+   * <pre>
+   * MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
+   * </pre>.
+   *
+   * @param completed the input that has completed
+   * @param transform the transform that has completed processing the input
+   * @param outputs the bundles the transform has output
+   * @param earliestHold the earliest watermark hold in the transform's state. 
{@code null} if there
+   *                     is no hold
+   */
+  public void updateWatermarks(
+      @Nullable CommittedBundle<?> completed,
+      AppliedPTransform<?, ?, ?> transform,
+      TimerUpdate timerUpdate,
+      Iterable<? extends CommittedBundle<?>> outputs,
+      @Nullable Instant earliestHold) {
+    updatePending(completed, transform, timerUpdate, outputs);
+    TransformWatermarks transformWms = transformToWatermarks.get(transform);
+    transformWms.setEventTimeHold(completed == null ? null : 
completed.getKey(), earliestHold);
+    refreshWatermarks(transform);
+  }
+
+  private void refreshWatermarks(AppliedPTransform<?, ?, ?> transform) {
+    TransformWatermarks myWatermarks = transformToWatermarks.get(transform);
+    WatermarkUpdate updateResult = myWatermarks.refresh();
+    if (updateResult.isAdvanced()) {
+      for (PValue outputPValue : transform.getOutput().expand()) {
+        Collection<AppliedPTransform<?, ?, ?>> downstreamTransforms = 
consumers.get(outputPValue);
+        if (downstreamTransforms != null) {
+          for (AppliedPTransform<?, ?, ?> downstreamTransform : 
downstreamTransforms) {
+            refreshWatermarks(downstreamTransform);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Removes all of the completed Timers from the collection of pending 
timers, adds all new timers,
+   * and removes all deleted timers. Removes all elements consumed by the 
input bundle from the
+   * {@link PTransform PTransforms} collection of pending elements, and adds 
all elements produced
+   * by the {@link PTransform} to the pending queue of each consumer.
+   */
+  private void updatePending(
+      CommittedBundle<?> input,
+      AppliedPTransform<?, ?, ?> transform,
+      TimerUpdate timerUpdate,
+      Iterable<? extends CommittedBundle<?>> outputs) {
+    TransformWatermarks completedTransform = 
transformToWatermarks.get(transform);
+    completedTransform.updateTimers(timerUpdate);
+    if (input != null) {
+      completedTransform.removePending(input);
+    }
+
+    for (CommittedBundle<?> bundle : outputs) {
+      for (AppliedPTransform<?, ?, ?> consumer : 
consumers.get(bundle.getPCollection())) {
+        TransformWatermarks watermarks = transformToWatermarks.get(consumer);
+        watermarks.addPending(bundle);
+      }
+    }
+  }
+
+  /**
+   * Returns a map of each {@link PTransform} that has pending timers to those 
timers. All of the
+   * pending timers will be removed from this {@link InMemoryWatermarkManager}.
+   */
+  public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
extractFiredTimers() {
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> allTimers = new 
HashMap<>();
+    for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> 
watermarksEntry :
+        transformToWatermarks.entrySet()) {
+      Map<Object, FiredTimers> keyFiredTimers = 
watermarksEntry.getValue().extractFiredTimers();
+      if (!keyFiredTimers.isEmpty()) {
+        allTimers.put(watermarksEntry.getKey(), keyFiredTimers);
+      }
+    }
+    return allTimers;
+  }
+
+  /**
+   * A (key, Instant) pair that holds the watermark. Holds are per-key, but 
the watermark is global,
+   * and as such the watermark manager must track holds and the release of 
holds on a per-key basis.
+   *
+   * <p>The {@link #compareTo(KeyedHold)} method of {@link KeyedHold} is not 
consistent with equals,
+   * as the key is arbitrarily ordered via identity, rather than object 
equality.
+   */
+  private static final class KeyedHold implements Comparable<KeyedHold> {
+    private static final Ordering<Object> KEY_ORDERING = 
Ordering.arbitrary().nullsLast();
+
+    private final Object key;
+    private final Instant timestamp;
+
+    /**
+     * Create a new KeyedHold with the specified key and timestamp.
+     */
+    public static KeyedHold of(Object key, Instant timestamp) {
+      return new KeyedHold(key, MoreObjects.firstNonNull(timestamp, 
THE_END_OF_TIME.get()));
+    }
+
+    private KeyedHold(Object key, Instant timestamp) {
+      this.key = key;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public int compareTo(KeyedHold that) {
+      return ComparisonChain.start()
+          .compare(this.timestamp, that.timestamp)
+          .compare(this.key, that.key, KEY_ORDERING)
+          .result();
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(timestamp, key);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof KeyedHold)) {
+        return false;
+      }
+      KeyedHold that = (KeyedHold) other;
+      return Objects.equals(this.timestamp, that.timestamp) && 
Objects.equals(this.key, that.key);
+    }
+
+    /**
+     * Get the value of this {@link KeyedHold}.
+     */
+    public Instant getTimestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(KeyedHold.class)
+          .add("key", key)
+          .add("hold", timestamp)
+          .toString();
+    }
+  }
+
+  private static class PerKeyHolds {
+    private final Map<Object, KeyedHold> keyedHolds;
+    private final PriorityQueue<KeyedHold> allHolds;
+
+    private PerKeyHolds() {
+      this.keyedHolds = new HashMap<>();
+      this.allHolds = new PriorityQueue<>();
+    }
+
+    /**
+     * Gets the minimum hold across all keys in this {@link PerKeyHolds}, or 
THE_END_OF_TIME if
+     * there are no holds within this {@link PerKeyHolds}.
+     */
+    public Instant getMinHold() {
+      return allHolds.isEmpty() ? THE_END_OF_TIME.get() : 
allHolds.peek().getTimestamp();
+    }
+
+    /**
+     * Updates the hold of the provided key to the provided value, removing 
any other holds for
+     * the same key.
+     */
+    public void updateHold(@Nullable Object key, Instant newHold) {
+      removeHold(key);
+      KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
+      keyedHolds.put(key, newKeyedHold);
+      allHolds.offer(newKeyedHold);
+    }
+
+    /**
+     * Removes the hold of the provided key.
+     */
+    public void removeHold(Object key) {
+      KeyedHold oldHold = keyedHolds.get(key);
+      if (oldHold != null) {
+        allHolds.remove(oldHold);
+      }
+    }
+  }
+
+  /**
+   * A reference to the input and output watermarks of an {@link 
AppliedPTransform}.
+   */
+  public class TransformWatermarks {
+    private final AppliedPTransformInputWatermark inputWatermark;
+    private final AppliedPTransformOutputWatermark outputWatermark;
+
+    private final SynchronizedProcessingTimeInputWatermark 
synchronizedProcessingInputWatermark;
+    private final SynchronizedProcessingTimeOutputWatermark 
synchronizedProcessingOutputWatermark;
+
+    private Instant latestSynchronizedInputWm;
+    private Instant latestSynchronizedOutputWm;
+
+    private TransformWatermarks(
+        AppliedPTransformInputWatermark inputWatermark,
+        AppliedPTransformOutputWatermark outputWatermark,
+        SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark,
+        SynchronizedProcessingTimeOutputWatermark 
outputSynchProcessingWatermark) {
+      this.inputWatermark = inputWatermark;
+      this.outputWatermark = outputWatermark;
+
+      this.synchronizedProcessingInputWatermark = 
inputSynchProcessingWatermark;
+      this.synchronizedProcessingOutputWatermark = 
outputSynchProcessingWatermark;
+      this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    /**
+     * Returns the input watermark of the {@link AppliedPTransform}.
+     */
+    public Instant getInputWatermark() {
+      return Preconditions.checkNotNull(inputWatermark.get());
+    }
+
+    /**
+     * Returns the output watermark of the {@link AppliedPTransform}.
+     */
+    public Instant getOutputWatermark() {
+      return outputWatermark.get();
+    }
+
+    /**
+     * Returns the synchronized processing input time of the {@link 
AppliedPTransform}.
+     *
+     * <p>The returned value is guaranteed to be monotonically increasing, and 
outside of the
+     * presence of holds, will increase as the system time progresses.
+     */
+    public synchronized Instant getSynchronizedProcessingInputTime() {
+      latestSynchronizedInputWm = INSTANT_ORDERING.max(
+          latestSynchronizedInputWm,
+          INSTANT_ORDERING.min(clock.now(), 
synchronizedProcessingInputWatermark.get()));
+      return latestSynchronizedInputWm;
+    }
+
+    /**
+     * Returns the synchronized processing output time of the {@link 
AppliedPTransform}.
+     *
+     * <p>The returned value is guaranteed to be monotonically increasing, and 
outside of the
+     * presence of holds, will increase as the system time progresses.
+     */
+    public synchronized Instant getSynchronizedProcessingOutputTime() {
+      latestSynchronizedOutputWm = INSTANT_ORDERING.max(
+          latestSynchronizedOutputWm,
+          INSTANT_ORDERING.min(clock.now(), 
synchronizedProcessingOutputWatermark.get()));
+      return latestSynchronizedOutputWm;
+    }
+
+    private WatermarkUpdate refresh() {
+      inputWatermark.refresh();
+      synchronizedProcessingInputWatermark.refresh();
+      WatermarkUpdate eventOutputUpdate = outputWatermark.refresh();
+      WatermarkUpdate syncOutputUpdate = 
synchronizedProcessingOutputWatermark.refresh();
+      return eventOutputUpdate.union(syncOutputUpdate);
+    }
+
+    private void setEventTimeHold(Object key, Instant newHold) {
+      outputWatermark.updateHold(key, newHold);
+    }
+
+    private void removePending(CommittedBundle<?> bundle) {
+      inputWatermark.removePendingElements(elementsFromBundle(bundle));
+      synchronizedProcessingInputWatermark.removePending(bundle);
+    }
+
+    private void addPending(CommittedBundle<?> bundle) {
+      inputWatermark.addPendingElements(elementsFromBundle(bundle));
+      synchronizedProcessingInputWatermark.addPending(bundle);
+    }
+
+    private Iterable<? extends WindowedValue<?>> 
elementsFromBundle(CommittedBundle<?> bundle) {
+      return 
FluentIterable.from(bundle.getElements()).transformAndConcat(EXPLODE_WINDOWS_FN);
+    }
+
+    private Map<Object, FiredTimers> extractFiredTimers() {
+      Map<Object, List<TimerData>> eventTimeTimers = 
inputWatermark.extractFiredEventTimeTimers();
+      Map<Object, List<TimerData>> processingTimers;
+      Map<Object, List<TimerData>> synchronizedTimers;
+      if (inputWatermark.get().equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+        processingTimers = 
synchronizedProcessingInputWatermark.extractFiredDomainTimers(
+            TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
+        synchronizedTimers = 
synchronizedProcessingInputWatermark.extractFiredDomainTimers(
+            TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
+      } else {
+        processingTimers = 
synchronizedProcessingInputWatermark.extractFiredDomainTimers(
+            TimeDomain.PROCESSING_TIME, clock.now());
+        synchronizedTimers = 
synchronizedProcessingInputWatermark.extractFiredDomainTimers(
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME, 
getSynchronizedProcessingInputTime());
+      }
+      Map<Object, Map<TimeDomain, List<TimerData>>> groupedTimers = new 
HashMap<>();
+      groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, 
synchronizedTimers);
+
+      Map<Object, FiredTimers> keyFiredTimers = new HashMap<>();
+      for (Map.Entry<Object, Map<TimeDomain, List<TimerData>>> firedTimers :
+          groupedTimers.entrySet()) {
+        keyFiredTimers.put(firedTimers.getKey(), new 
FiredTimers(firedTimers.getValue()));
+      }
+      return keyFiredTimers;
+    }
+
+    @SafeVarargs
+    private final void groupFiredTimers(
+        Map<Object, Map<TimeDomain, List<TimerData>>> groupedToMutate,
+        Map<Object, List<TimerData>>... timersToGroup) {
+      for (Map<Object, List<TimerData>> subGroup : timersToGroup) {
+        for (Map.Entry<Object, List<TimerData>> newTimers : 
subGroup.entrySet()) {
+          Map<TimeDomain, List<TimerData>> grouped = 
groupedToMutate.get(newTimers.getKey());
+          if (grouped == null) {
+            grouped = new HashMap<>();
+            groupedToMutate.put(newTimers.getKey(), grouped);
+          }
+          grouped.put(newTimers.getValue().get(0).getDomain(), 
newTimers.getValue());
+        }
+      }
+    }
+
+    private void updateTimers(TimerUpdate update) {
+      inputWatermark.updateTimers(update);
+      synchronizedProcessingInputWatermark.updateTimers(update);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(TransformWatermarks.class)
+          .add("inputWatermark", inputWatermark)
+          .add("outputWatermark", outputWatermark)
+          .add("inputProcessingTime", synchronizedProcessingInputWatermark)
+          .add("outputProcessingTime", synchronizedProcessingOutputWatermark)
+          .toString();
+    }
+  }
+
+  /**
+   * A collection of newly set, deleted, and completed timers.
+   *
+   * <p>setTimers and deletedTimers are collections of {@link TimerData} that 
have been added to the
+   * {@link TimerInternals} of an executed step. completedTimers are timers 
that were delivered as
+   * the input to the executed step.
+   */
+  public static class TimerUpdate {
+    private final Object key;
+    private final Iterable<? extends TimerData> completedTimers;
+
+    private final Iterable<? extends TimerData> setTimers;
+    private final Iterable<? extends TimerData> deletedTimers;
+
+    /**
+     * Returns a TimerUpdate for a null key with no timers.
+     */
+    public static TimerUpdate empty() {
+      return new TimerUpdate(
+          null,
+          Collections.<TimerData>emptyList(),
+          Collections.<TimerData>emptyList(),
+          Collections.<TimerData>emptyList());
+    }
+
+    /**
+     * Creates a new {@link TimerUpdate} builder with the provided completed 
timers that needs the
+     * set and deleted timers to be added to it.
+     */
+    public static TimerUpdateBuilder builder(Object key) {
+      return new TimerUpdateBuilder(key);
+    }
+
+    /**
+     * A {@link TimerUpdate} builder that needs to be provided with set timers 
and deleted timers.
+     */
+    public static final class TimerUpdateBuilder {
+      private final Object key;
+      private final Collection<TimerData> completedTimers;
+      private final Collection<TimerData> setTimers;
+      private final Collection<TimerData> deletedTimers;
+
+      private TimerUpdateBuilder(Object key) {
+        this.key = key;
+        this.completedTimers = new HashSet<>();
+        this.setTimers = new HashSet<>();
+        this.deletedTimers = new HashSet<>();
+      }
+
+      /**
+       * Adds all of the provided timers to the collection of completed 
timers, and returns this
+       * {@link TimerUpdateBuilder}.
+       */
+      public TimerUpdateBuilder withCompletedTimers(Iterable<TimerData> 
completedTimers) {
+        Iterables.addAll(this.completedTimers, completedTimers);
+        return this;
+      }
+
+      /**
+       * Adds the provided timer to the collection of set timers, removing it 
from deleted timers if
+       * it has previously been deleted. Returns this {@link 
TimerUpdateBuilder}.
+       */
+      public TimerUpdateBuilder setTimer(TimerData setTimer) {
+        deletedTimers.remove(setTimer);
+        setTimers.add(setTimer);
+        return this;
+      }
+
+      /**
+       * Adds the provided timer to the collection of deleted timers, removing 
it from set timers if
+       * it has previously been set. Returns this {@link TimerUpdateBuilder}.
+       */
+      public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) {
+        deletedTimers.add(deletedTimer);
+        setTimers.remove(deletedTimer);
+        return this;
+      }
+
+      /**
+       * Returns a new {@link TimerUpdate} with the most recently set 
completedTimers, setTimers,
+       * and deletedTimers.
+       */
+      public TimerUpdate build() {
+        return new TimerUpdate(
+            key,
+            ImmutableSet.copyOf(completedTimers),
+            ImmutableSet.copyOf(setTimers),
+            ImmutableSet.copyOf(deletedTimers));
+      }
+    }
+
+    private TimerUpdate(
+        Object key,
+        Iterable<? extends TimerData> completedTimers,
+        Iterable<? extends TimerData> setTimers,
+        Iterable<? extends TimerData> deletedTimers) {
+      this.key = key;
+      this.completedTimers = completedTimers;
+      this.setTimers = setTimers;
+      this.deletedTimers = deletedTimers;
+    }
+
+    @VisibleForTesting
+    Object getKey() {
+      return key;
+    }
+
+    @VisibleForTesting
+    Iterable<? extends TimerData> getCompletedTimers() {
+      return completedTimers;
+    }
+
+    @VisibleForTesting
+    Iterable<? extends TimerData> getSetTimers() {
+      return setTimers;
+    }
+
+    @VisibleForTesting
+    Iterable<? extends TimerData> getDeletedTimers() {
+      return deletedTimers;
+    }
+
+    /**
+     * Returns a {@link TimerUpdate} that is like this one, but with the 
specified completed timers.
+     */
+    public TimerUpdate withCompletedTimers(Iterable<TimerData> 
completedTimers) {
+      return new TimerUpdate(this.key, completedTimers, setTimers, 
deletedTimers);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(key, completedTimers, setTimers, deletedTimers);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof TimerUpdate)) {
+        return false;
+      }
+      TimerUpdate that = (TimerUpdate) other;
+      return Objects.equals(this.key, that.key)
+          && Objects.equals(this.completedTimers, that.completedTimers)
+          && Objects.equals(this.setTimers, that.setTimers)
+          && Objects.equals(this.deletedTimers, that.deletedTimers);
+    }
+  }
+
+  /**
+   * A pair of {@link TimerData} and key which can be delivered to the 
appropriate
+   * {@link AppliedPTransform}. A timer fires at the transform that set it 
with a specific key when
+   * the time domain in which it lives progresses past a specified time, as 
determined by the
+   * {@link InMemoryWatermarkManager}.
+   */
+  public static class FiredTimers {
+    private final Map<TimeDomain, ? extends Collection<TimerData>> timers;
+
+    private FiredTimers(Map<TimeDomain, ? extends Collection<TimerData>> 
timers) {
+      this.timers = timers;
+    }
+
+    /**
+     * Gets all of the timers that have fired within the provided {@link 
TimeDomain}. If no timers
+     * fired within the provided domain, return an empty collection.
+     *
+     * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of 
increasing timestamp.
+     */
+    public Collection<TimerData> getTimers(TimeDomain domain) {
+      Collection<TimerData> domainTimers = timers.get(domain);
+      if (domainTimers == null) {
+        return Collections.emptyList();
+      }
+      return domainTimers;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(FiredTimers.class).add("timers", 
timers).toString();
+    }
+  }
+
+  private static class WindowedValueByTimestampComparator extends 
Ordering<WindowedValue<?>> {
+    @Override
+    public int compare(WindowedValue<?> o1, WindowedValue<?> o2) {
+      return ComparisonChain.start()
+          .compare(o1.getTimestamp(), o2.getTimestamp())
+          .result();
+    }
+  }
+
+  public Set<AppliedPTransform<?, ?, ?>> getCompletedTransforms() {
+    Set<AppliedPTransform<?, ?, ?>> result = new HashSet<>();
+    for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> wms :
+        transformToWatermarks.entrySet()) {
+      if (wms.getValue().getOutputWatermark().equals(THE_END_OF_TIME.get())) {
+        result.add(wms.getKey());
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
new file mode 100644
index 0000000..bc9b04c
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory that produces bundles that perform no additional validation.
+ */
+class InProcessBundleFactory implements BundleFactory {
+  public static InProcessBundleFactory create() {
+    return new InProcessBundleFactory();
+  }
+
+  private InProcessBundleFactory() {}
+
+  @Override
+  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
+    return InProcessBundle.create(output, null);
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, 
PCollection<T> output) {
+    return InProcessBundle.create(output, input.getKey());
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, @Nullable Object key, PCollection<T> output) {
+    return InProcessBundle.create(output, key);
+  }
+
+  /**
+   * A {@link UncommittedBundle} that buffers elements in memory.
+   */
+  private static final class InProcessBundle<T> implements 
UncommittedBundle<T> {
+    private final PCollection<T> pcollection;
+    @Nullable private final Object key;
+    private boolean committed = false;
+    private ImmutableList.Builder<WindowedValue<T>> elements;
+
+    /**
+     * Create a new {@link InProcessBundle} for the specified {@link 
PCollection}.
+     */
+    public static <T> InProcessBundle<T> create(PCollection<T> pcollection, 
@Nullable Object key) {
+      return new InProcessBundle<T>(pcollection, key);
+    }
+
+    private InProcessBundle(PCollection<T> pcollection, Object key) {
+      this.pcollection = pcollection;
+      this.key = key;
+      this.elements = ImmutableList.builder();
+    }
+
+    @Override
+    public PCollection<T> getPCollection() {
+      return pcollection;
+    }
+
+    @Override
+    public InProcessBundle<T> add(WindowedValue<T> element) {
+      checkState(
+          !committed,
+          "Can't add element %s to committed bundle in PCollection %s",
+          element,
+          pcollection);
+      elements.add(element);
+      return this;
+    }
+
+    @Override
+    public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) 
{
+      checkState(!committed, "Can't commit already committed bundle %s", this);
+      committed = true;
+      final Iterable<WindowedValue<T>> committedElements = elements.build();
+      return new CommittedInProcessBundle<>(
+          pcollection, key, committedElements, synchronizedCompletionTime);
+    }
+  }
+
+  private static class CommittedInProcessBundle<T> implements 
CommittedBundle<T> {
+    public CommittedInProcessBundle(
+        PCollection<T> pcollection,
+        Object key,
+        Iterable<WindowedValue<T>> committedElements,
+        Instant synchronizedCompletionTime) {
+      this.pcollection = pcollection;
+      this.key = key;
+      this.committedElements = committedElements;
+      this.synchronizedCompletionTime = synchronizedCompletionTime;
+    }
+
+    private final PCollection<T> pcollection;
+    private final Object key;
+    private final Iterable<WindowedValue<T>> committedElements;
+    private final Instant synchronizedCompletionTime;
+
+    @Override
+    @Nullable
+    public Object getKey() {
+      return key;
+    }
+
+    @Override
+    public Iterable<WindowedValue<T>> getElements() {
+      return committedElements;
+    }
+
+    @Override
+    public PCollection<T> getPCollection() {
+      return pcollection;
+    }
+
+    @Override
+    public Instant getSynchronizedProcessingOutputWatermark() {
+      return synchronizedCompletionTime;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .omitNullValues()
+          .add("pcollection", pcollection)
+          .add("key", key)
+          .add("elements", committedElements)
+          .toString();
+    }
+
+    @Override
+    public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> 
elements) {
+      return new CommittedInProcessBundle<>(
+          pcollection, key, ImmutableList.copyOf(elements), 
synchronizedCompletionTime);
+    }
+  }
+}


Reply via email to