[BEAM-65] SplittableDoFn prototype.

Work in progress. Currently only runs in direct runner,
and not ready for any use by real users.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0a24883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0a24883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0a24883

Branch: refs/heads/master
Commit: a0a24883737850052f54290255780e868c0b63dc
Parents: a5d1293
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Thu Aug 11 17:13:53 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Wed Oct 12 17:29:20 2016 -0700

----------------------------------------------------------------------
 runners/core-java/pom.xml                       |   6 +
 .../runners/core/ElementAndRestriction.java     |  42 ++
 .../core/ElementAndRestrictionCoder.java        |  67 +++
 .../runners/core/GBKIntoKeyedWorkItems.java     |  40 ++
 .../beam/runners/core/SplittableParDo.java      | 469 ++++++++++++++++
 .../core/ElementAndRestrictionCoderTest.java    | 127 +++++
 .../beam/runners/core/SplittableParDoTest.java  | 467 ++++++++++++++++
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  66 +++
 .../beam/runners/direct/DirectRunner.java       |   5 +
 .../runners/direct/ParDoOverrideFactory.java    |  55 ++
 .../beam/runners/direct/SplittableDoFnTest.java | 225 ++++++++
 .../beam/sdk/annotations/Experimental.java      |   8 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    | 218 +++++++-
 .../beam/sdk/transforms/DoFnAdapters.java       |  27 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  | 117 ++--
 .../org/apache/beam/sdk/transforms/ParDo.java   |  19 +
 .../sdk/transforms/reflect/DoFnInvoker.java     |  26 +-
 .../sdk/transforms/reflect/DoFnInvokers.java    | 179 +++++-
 .../sdk/transforms/reflect/DoFnSignature.java   | 114 +++-
 .../sdk/transforms/reflect/DoFnSignatures.java  | 366 ++++++++++++-
 .../splittabledofn/RestrictionTracker.java      |  42 ++
 .../transforms/splittabledofn/package-info.java |  22 +
 .../org/apache/beam/sdk/coders/KvCoderTest.java |  99 ++--
 .../apache/beam/sdk/transforms/ParDoTest.java   |  46 ++
 .../transforms/reflect/DoFnInvokersTest.java    | 261 ++++++++-
 .../DoFnSignaturesProcessElementTest.java       |   4 +-
 .../DoFnSignaturesSplittableDoFnTest.java       | 543 +++++++++++++++++++
 27 files changed, 3495 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index d958dd2..d84c420 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -190,6 +190,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>annotations</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java
new file mode 100644
index 0000000..4a5d0c4
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * A tuple of an element and a restriction applied to processing it with a
+ * <a href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn}.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+@AutoValue
+public abstract class ElementAndRestriction<ElementT, RestrictionT> {
+  /** The element to process. */
+  public abstract ElementT element();
+
+  /** The restriction applied to processing the element. */
+  public abstract RestrictionT restriction();
+
+  /** Constructs the {@link ElementAndRestriction}. */
+  public static <InputT, RestrictionT> ElementAndRestriction<InputT, 
RestrictionT> of(
+      InputT element, RestrictionT restriction) {
+    return new AutoValue_ElementAndRestriction<>(element, restriction);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
new file mode 100644
index 0000000..6dec8e2
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+
+/** A {@link Coder} for {@link ElementAndRestriction}. */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class ElementAndRestrictionCoder<ElementT, RestrictionT>
+    extends CustomCoder<ElementAndRestriction<ElementT, RestrictionT>> {
+  private final Coder<ElementT> elementCoder;
+  private final Coder<RestrictionT> restrictionCoder;
+
+  /**
+   * Creates an {@link ElementAndRestrictionCoder} from an element coder and a 
restriction coder.
+   */
+  public static <ElementT, RestrictionT> ElementAndRestrictionCoder<ElementT, 
RestrictionT> of(
+      Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) {
+    return new ElementAndRestrictionCoder<>(elementCoder, restrictionCoder);
+  }
+
+  private ElementAndRestrictionCoder(
+      Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) {
+    this.elementCoder = elementCoder;
+    this.restrictionCoder = restrictionCoder;
+  }
+
+  @Override
+  public void encode(
+      ElementAndRestriction<ElementT, RestrictionT> value, OutputStream 
outStream, Context context)
+      throws IOException {
+    if (value == null) {
+      throw new CoderException("cannot encode a null ElementAndRestriction");
+    }
+    elementCoder.encode(value.element(), outStream, context.nested());
+    restrictionCoder.encode(value.restriction(), outStream, context);
+  }
+
+  @Override
+  public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream 
inStream, Context context)
+      throws IOException {
+    ElementT key = elementCoder.decode(inStream, context.nested());
+    RestrictionT value = restrictionCoder.decode(inStream, context);
+    return ElementAndRestriction.of(key, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
new file mode 100644
index 0000000..ca4d681
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Interface for creating a runner-specific {@link GroupByKey GroupByKey-like} 
{@link PTransform}
+ * that produces {@link KeyedWorkItem KeyedWorkItems} so that downstream 
transforms can access state
+ * and timers.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class GBKIntoKeyedWorkItems<KeyT, InputT>
+    extends PTransform<PCollection<KV<KeyT, InputT>>, 
PCollection<KeyedWorkItem<KeyT, InputT>>> {
+  @Override
+  public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, 
InputT>> input) {
+    return PCollection.createPrimitiveOutputInternal(
+        input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
new file mode 100644
index 0000000..7645149
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import java.util.UUID;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * A utility transform that executes a <a
+ * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn} by 
expanding it into a
+ * network of simpler transforms:
+ *
+ * <ol>
+ * <li>Pair each element with an initial restriction
+ * <li>Split each restriction into sub-restrictions
+ * <li>Assign a unique key to each element/restriction pair
+ * <li>Group by key (so that work is partitioned by key and we can access 
state/timers)
+ * <li>Process each keyed element/restriction pair with the splittable {@link 
DoFn}'s {@link
+ *     DoFn.ProcessElement} method, using state and timers API.
+ * </ol>
+ *
+ * <p>This transform is intended as a helper for internal use by runners when 
implementing {@code
+ * ParDo.of(splittable DoFn)}, but not for direct use by pipeline writers.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class SplittableParDo<
+        InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>>
+    extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
+  private final DoFn<InputT, OutputT> fn;
+  private final DoFnSignature signature;
+
+  /**
+   * Creates the transform for the given original {@link ParDo} and {@link 
DoFn}.
+   *
+   * @param fn The splittable {@link DoFn} inside the original {@link ParDo} 
transform.
+   */
+  public SplittableParDo(DoFn<InputT, OutputT> fn) {
+    checkNotNull(fn, "fn must not be null");
+    this.fn = fn;
+    this.signature = 
DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass());
+    checkArgument(signature.processElement().isSplittable(), "fn must be a 
splittable DoFn");
+  }
+
+  @Override
+  public PCollection<OutputT> apply(PCollection<InputT> input) {
+    PCollection.IsBounded isFnBounded = signature.isBoundedPerElement();
+    Coder<RestrictionT> restrictionCoder =
+        DoFnInvokers.INSTANCE
+            .newByteBuddyInvoker(fn)
+            .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
+    Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder =
+        ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder);
+
+    PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>> keyedWorkItems =
+        input
+            .apply(
+                "Pair with initial restriction",
+                ParDo.of(new PairWithRestrictionFn<InputT, OutputT, 
RestrictionT>(fn)))
+            .setCoder(splitCoder)
+            .apply("Split restriction", ParDo.of(new 
SplitRestrictionFn<InputT, RestrictionT>(fn)))
+            .setCoder(splitCoder)
+            .apply(
+                "Assign unique key",
+                WithKeys.of(new 
RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>()))
+            .apply(
+                "Group by key",
+                new GBKIntoKeyedWorkItems<String, 
ElementAndRestriction<InputT, RestrictionT>>());
+    checkArgument(
+        keyedWorkItems.getWindowingStrategy().getWindowFn() instanceof 
GlobalWindows,
+        "GBKIntoKeyedWorkItems must produce a globally windowed collection, "
+            + "but windowing strategy was: %s",
+        keyedWorkItems.getWindowingStrategy());
+    return keyedWorkItems
+        .apply(
+            "Process",
+            ParDo.of(
+                new ProcessFn<InputT, OutputT, RestrictionT, TrackerT>(
+                    fn,
+                    input.getCoder(),
+                    restrictionCoder,
+                    input.getWindowingStrategy().getWindowFn().windowCoder())))
+        .setIsBoundedInternal(input.isBounded().and(isFnBounded))
+        .setWindowingStrategyInternal(input.getWindowingStrategy());
+  }
+
+  /**
+   * Assigns a random unique key to each element of the input collection, so 
that the output
+   * collection is effectively the same elements as input, but the per-key 
state and timers are now
+   * effectively per-element.
+   */
+  private static class RandomUniqueKeyFn<T> implements SerializableFunction<T, 
String> {
+    @Override
+    public String apply(T input) {
+      return UUID.randomUUID().toString();
+    }
+  }
+
+  /**
+   * Pairs each input element with its initial restriction using the given 
splittable {@link DoFn}.
+   */
+  private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT>
+      extends DoFn<InputT, ElementAndRestriction<InputT, RestrictionT>> {
+    private DoFn<InputT, OutputT> fn;
+    private transient DoFnInvoker<InputT, OutputT> invoker;
+
+    PairWithRestrictionFn(DoFn<InputT, OutputT> fn) {
+      this.fn = fn;
+    }
+
+    @Setup
+    public void setup() {
+      invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      context.output(
+          ElementAndRestriction.of(
+              context.element(),
+              
invoker.<RestrictionT>invokeGetInitialRestriction(context.element())));
+    }
+  }
+
+  /**
+   * The heart of splittable {@link DoFn} execution: processes a single 
(element, restriction) pair
+   * by creating a tracker for the restriction and checkpointing/resuming 
processing later if
+   * necessary.
+   *
+   * <p>TODO: This uses deprecated OldDoFn since DoFn does not provide access 
to state/timer
+   * internals. This should be rewritten to use the <a 
href="https://s.apache.org/beam-state";>State
+   * and Timers API</a> once it is available.
+   */
+  @VisibleForTesting
+  static class ProcessFn<
+          InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>>
+      extends OldDoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>, OutputT> {
+    // Commit at least once every 10k output records.  This keeps the 
watermark advancing
+    // smoothly, and ensures that not too much work will have to be 
reprocessed in the event of
+    // a crash.
+    // TODO: Also commit at least once every N seconds (runner-specific 
parameter).
+    @VisibleForTesting static final int MAX_OUTPUTS_PER_BUNDLE = 10000;
+
+    /**
+     * The state cell containing a watermark hold for the output of this 
{@link DoFn}. The hold is
+     * acquired during the first {@link DoFn.ProcessElement} call for each 
element and restriction,
+     * and is released when the {@link DoFn.ProcessElement} call returns {@link
+     * DoFn.ProcessContinuation#stop}.
+     *
+     * <p>A hold is needed to avoid letting the output watermark immediately 
progress together with
+     * the input watermark when the first {@link DoFn.ProcessElement} call for 
this element
+     * completes.
+     *
+     * <p>The hold is updated with the future output watermark reported by 
ProcessContinuation.
+     */
+    private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> 
watermarkHoldTag =
+        StateTags.makeSystemTagInternal(
+            StateTags.<GlobalWindow>watermarkStateInternal(
+                "hold", OutputTimeFns.outputAtLatestInputTimestamp()));
+
+    /**
+     * The state cell containing a copy of the element. Written during the 
first {@link
+     * DoFn.ProcessElement} call and read during subsequent calls in response 
to timer firings, when
+     * the original element is no longer available.
+     */
+    private final StateTag<Object, ValueState<WindowedValue<InputT>>> 
elementTag;
+
+    /**
+     * The state cell containing a restriction representing the unprocessed 
part of work for this
+     * element.
+     */
+    private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
+
+    private final DoFn<InputT, OutputT> fn;
+    private final Coder<? extends BoundedWindow> windowCoder;
+
+    private transient DoFnInvoker<InputT, OutputT> invoker;
+
+    ProcessFn(
+        DoFn<InputT, OutputT> fn,
+        Coder<InputT> elementCoder,
+        Coder<RestrictionT> restrictionCoder,
+        Coder<? extends BoundedWindow> windowCoder) {
+      this.fn = fn;
+      this.windowCoder = windowCoder;
+      elementTag =
+          StateTags.value("element", WindowedValue.getFullCoder(elementCoder, 
this.windowCoder));
+      DoFnInvoker<InputT, OutputT> invoker = 
DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+      restrictionTag = StateTags.value("restriction", restrictionCoder);
+    }
+
+    @Override
+    public void setup() throws Exception {
+      invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    }
+
+    @Override
+    public void processElement(final ProcessContext c) {
+      // Initialize state (element and restriction) depending on whether this 
is the seed call.
+      // The seed call is the first call for this element, which actually has 
the element.
+      // Subsequent calls are timer firings and the element has to be 
retrieved from the state.
+      TimerInternals.TimerData timer = 
Iterables.getOnlyElement(c.element().timersIterable(), null);
+      boolean isSeedCall = (timer == null);
+      StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : 
timer.getNamespace();
+      ValueState<WindowedValue<InputT>> elementState =
+          c.windowingInternals().stateInternals().state(stateNamespace, 
elementTag);
+      ValueState<RestrictionT> restrictionState =
+          c.windowingInternals().stateInternals().state(stateNamespace, 
restrictionTag);
+      WatermarkHoldState<GlobalWindow> holdState =
+          c.windowingInternals().stateInternals().state(stateNamespace, 
watermarkHoldTag);
+
+      ElementAndRestriction<WindowedValue<InputT>, RestrictionT> 
elementAndRestriction;
+      if (isSeedCall) {
+        // The element and restriction are available in c.element().
+        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> 
windowedValue =
+            Iterables.getOnlyElement(c.element().elementsIterable());
+        WindowedValue<InputT> element = 
windowedValue.withValue(windowedValue.getValue().element());
+        elementState.write(element);
+        elementAndRestriction =
+            ElementAndRestriction.of(element, 
windowedValue.getValue().restriction());
+      } else {
+        // This is not the first ProcessElement call for this 
element/restriction - rather,
+        // this is a timer firing, so we need to fetch the element and 
restriction from state.
+        elementState.readLater();
+        restrictionState.readLater();
+        elementAndRestriction =
+            ElementAndRestriction.of(elementState.read(), 
restrictionState.read());
+      }
+
+      final TrackerT tracker = 
invoker.invokeNewTracker(elementAndRestriction.restriction());
+      @SuppressWarnings("unchecked")
+      final RestrictionT[] residual = (RestrictionT[]) new Object[1];
+      // TODO: Only let the call run for a limited amount of time, rather than 
simply
+      // producing a limited amount of output.
+      DoFn.ProcessContinuation cont =
+          invoker.invokeProcessElement(
+              makeContext(c, elementAndRestriction.element(), tracker, 
residual),
+              wrapTracker(tracker));
+      if (residual[0] == null) {
+        // This means the call completed unsolicited, and the context produced 
by makeContext()
+        // did not take a checkpoint. Take one now.
+        residual[0] = checkNotNull(tracker.checkpoint());
+      }
+
+      // Save state for resuming.
+      if (!cont.shouldResume()) {
+        // All work for this element/restriction is completed. Clear state and 
release hold.
+        elementState.clear();
+        restrictionState.clear();
+        holdState.clear();
+        return;
+      }
+      restrictionState.write(residual[0]);
+      Instant futureOutputWatermark = cont.getWatermark();
+      if (futureOutputWatermark != null) {
+        holdState.add(futureOutputWatermark);
+      }
+      // Set a timer to continue processing this element.
+      TimerInternals timerInternals = c.windowingInternals().timerInternals();
+      timerInternals.setTimer(
+          TimerInternals.TimerData.of(
+              stateNamespace,
+              timerInternals.currentProcessingTime().plus(cont.resumeDelay()),
+              TimeDomain.PROCESSING_TIME));
+    }
+
+    private DoFn<InputT, OutputT>.ProcessContext makeContext(
+        final ProcessContext baseContext,
+        final WindowedValue<InputT> element,
+        final TrackerT tracker,
+        final RestrictionT[] residualRestrictionHolder) {
+      return fn.new ProcessContext() {
+        private int numOutputs = 0;
+
+        public InputT element() {
+          return element.getValue();
+        }
+
+        public Instant timestamp() {
+          return element.getTimestamp();
+        }
+
+        public PaneInfo pane() {
+          return element.getPane();
+        }
+
+        public void output(OutputT output) {
+          baseContext
+              .windowingInternals()
+              .outputWindowedValue(
+                  output, element.getTimestamp(), element.getWindows(), 
element.getPane());
+          noteOutput();
+        }
+
+        public void outputWithTimestamp(OutputT output, Instant timestamp) {
+          baseContext
+              .windowingInternals()
+              .outputWindowedValue(output, timestamp, element.getWindows(), 
element.getPane());
+          noteOutput();
+        }
+
+        private void noteOutput() {
+          if (++numOutputs >= MAX_OUTPUTS_PER_BUNDLE) {
+            // Request a checkpoint. The fn *may* produce more output, but 
hopefully not too much.
+            residualRestrictionHolder[0] = tracker.checkpoint();
+          }
+        }
+
+        public <T> T sideInput(PCollectionView<T> view) {
+          return baseContext.sideInput(view);
+        }
+
+        public PipelineOptions getPipelineOptions() {
+          return baseContext.getPipelineOptions();
+        }
+
+        public <T> void sideOutput(TupleTag<T> tag, T output) {
+          // TODO: I'm not sure how to implement this correctly: there's no
+          // "internals.sideOutputWindowedValue".
+          throw new UnsupportedOperationException(
+              "Side outputs not yet supported by splittable DoFn");
+        }
+
+        public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
+          // TODO: I'm not sure how to implement this correctly: there's no
+          // "internals.sideOutputWindowedValue".
+          throw new UnsupportedOperationException(
+              "Side outputs not yet supported by splittable DoFn");
+        }
+
+        @Override
+        protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregator(
+            String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) 
{
+          return fn.createAggregator(name, combiner);
+        }
+      };
+    }
+
+    /** Creates an {@link DoFn.ExtraContextFactory} that provides just the 
given tracker. */
+    private DoFn.ExtraContextFactory<InputT, OutputT> wrapTracker(final 
TrackerT tracker) {
+      return new ExtraContextFactoryForTracker<>(tracker);
+    }
+
+    private static class ExtraContextFactoryForTracker<
+            InputT, OutputT, TrackerT extends RestrictionTracker<?>>
+        implements DoFn.ExtraContextFactory<InputT, OutputT> {
+      private final TrackerT tracker;
+
+      ExtraContextFactoryForTracker(TrackerT tracker) {
+        this.tracker = tracker;
+      }
+
+      @Override
+      public BoundedWindow window() {
+        // DoFnSignatures should have verified that this DoFn doesn't access 
extra context.
+        throw new IllegalStateException("Unexpected extra context access on a 
splittable DoFn");
+      }
+
+      @Override
+      public DoFn.InputProvider<InputT> inputProvider() {
+        // DoFnSignatures should have verified that this DoFn doesn't access 
extra context.
+        throw new IllegalStateException("Unexpected extra context access on a 
splittable DoFn");
+      }
+
+      @Override
+      public DoFn.OutputReceiver<OutputT> outputReceiver() {
+        // DoFnSignatures should have verified that this DoFn doesn't access 
extra context.
+        throw new IllegalStateException("Unexpected extra context access on a 
splittable DoFn");
+      }
+
+      @Override
+      public WindowingInternals<InputT, OutputT> windowingInternals() {
+        // DoFnSignatures should have verified that this DoFn doesn't access 
extra context.
+        throw new IllegalStateException("Unexpected extra context access on a 
splittable DoFn");
+      }
+
+      @Override
+      public TrackerT restrictionTracker() {
+        return tracker;
+      }
+    }
+  }
+
+  /** Splits the restriction using the given {@link DoFn.SplitRestriction} 
method. */
+  private static class SplitRestrictionFn<InputT, RestrictionT>
+      extends DoFn<
+          ElementAndRestriction<InputT, RestrictionT>,
+          ElementAndRestriction<InputT, RestrictionT>> {
+    private final DoFn<InputT, ?> splittableFn;
+    private transient DoFnInvoker<InputT, ?> invoker;
+
+    SplitRestrictionFn(DoFn<InputT, ?> splittableFn) {
+      this.splittableFn = splittableFn;
+    }
+
+    @Setup
+    public void setup() {
+      invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(splittableFn);
+    }
+
+    @ProcessElement
+    public void processElement(final ProcessContext c) {
+      final InputT element = c.element().element();
+      invoker.invokeSplitRestriction(
+          element,
+          c.element().restriction(),
+          new OutputReceiver<RestrictionT>() {
+            @Override
+            public void output(RestrictionT part) {
+              c.output(ElementAndRestriction.of(element, part));
+            }
+          });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java
new file mode 100644
index 0000000..f516046
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+/**
+ * Tests for {@link ElementAndRestrictionCoder}. Parroted from {@link
+ * org.apache.beam.sdk.coders.KvCoderTest}.
+ */
+@RunWith(Parameterized.class)
+public class ElementAndRestrictionCoderTest<K, V> {
+  private static class CoderAndData<T> {
+    Coder<T> coder;
+    List<T> data;
+  }
+
+  private static class AnyCoderAndData {
+    private CoderAndData<?> coderAndData;
+  }
+
+  private static <T> AnyCoderAndData coderAndData(Coder<T> coder, List<T> 
data) {
+    CoderAndData<T> coderAndData = new CoderAndData<>();
+    coderAndData.coder = coder;
+    coderAndData.data = data;
+    AnyCoderAndData res = new AnyCoderAndData();
+    res.coderAndData = coderAndData;
+    return res;
+  }
+
+  private static final List<AnyCoderAndData> TEST_DATA =
+      Arrays.asList(
+          coderAndData(
+              VarIntCoder.of(), Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, 
Integer.MIN_VALUE)),
+          coderAndData(
+              BigEndianLongCoder.of(),
+              Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE)),
+          coderAndData(StringUtf8Coder.of(), Arrays.asList("", "hello", 
"goodbye", "1")),
+          coderAndData(
+              ElementAndRestrictionCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()),
+              Arrays.asList(
+                  ElementAndRestriction.of("", -1),
+                  ElementAndRestriction.of("hello", 0),
+                  ElementAndRestriction.of("goodbye", Integer.MAX_VALUE))),
+          coderAndData(
+              ListCoder.of(VarLongCoder.of()),
+              Arrays.asList(Arrays.asList(1L, 2L, 3L), 
Collections.<Long>emptyList())));
+
+  @Parameterized.Parameters(name = "{index}: keyCoder={0} key={1} 
valueCoder={2} value={3}")
+  public static Collection<Object[]> data() {
+    List<Object[]> parameters = new ArrayList<>();
+    for (AnyCoderAndData keyCoderAndData : TEST_DATA) {
+      Coder keyCoder = keyCoderAndData.coderAndData.coder;
+      for (Object key : keyCoderAndData.coderAndData.data) {
+        for (AnyCoderAndData valueCoderAndData : TEST_DATA) {
+          Coder valueCoder = valueCoderAndData.coderAndData.coder;
+          for (Object value : valueCoderAndData.coderAndData.data) {
+            parameters.add(new Object[] {keyCoder, key, valueCoder, value});
+          }
+        }
+      }
+    }
+    return parameters;
+  }
+
+  @Parameter(0)
+  public Coder<K> keyCoder;
+  @Parameter(1)
+  public K key;
+  @Parameter(2)
+  public Coder<V> valueCoder;
+  @Parameter(3)
+  public V value;
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testDecodeEncodeEqual() throws Exception {
+    CoderProperties.coderDecodeEncodeEqual(
+        ElementAndRestrictionCoder.of(keyCoder, valueCoder),
+        ElementAndRestriction.of(key, value));
+  }
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void encodeNullThrowsCoderException() throws Exception {
+    thrown.expect(CoderException.class);
+    thrown.expectMessage("cannot encode a null ElementAndRestriction");
+
+    CoderUtils.encodeToBase64(
+        ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), 
null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
new file mode 100644
index 0000000..a76c4da
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SplittableParDo}. */
+@RunWith(JUnit4.class)
+public class SplittableParDoTest {
+  // ----------------- Tests for whether the transform sets boundedness 
correctly --------------
+  private static class SomeRestriction implements Serializable {}
+
+  private static class SomeRestrictionTracker implements 
RestrictionTracker<SomeRestriction> {
+    private final SomeRestriction someRestriction = new SomeRestriction();
+
+    @Override
+    public SomeRestriction currentRestriction() {
+      return someRestriction;
+    }
+
+    @Override
+    public SomeRestriction checkpoint() {
+      return someRestriction;
+    }
+  }
+
+  private static class BoundedFakeFn extends DoFn<Integer, String> {
+    @ProcessElement
+    public void processElement(ProcessContext context, SomeRestrictionTracker 
tracker) {}
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer element) {
+      return null;
+    }
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return null;
+    }
+  }
+
+  private static class UnboundedFakeFn extends DoFn<Integer, String> {
+    @ProcessElement
+    public ProcessContinuation processElement(
+        ProcessContext context, SomeRestrictionTracker tracker) {
+      return stop();
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer element) {
+      return null;
+    }
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return null;
+    }
+  }
+
+  private static PCollection<Integer> makeUnboundedCollection(Pipeline 
pipeline) {
+    return pipeline
+        .apply("unbounded", Create.of(1, 2, 3))
+        .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+  }
+
+  private static PCollection<Integer> makeBoundedCollection(Pipeline pipeline) 
{
+    return pipeline
+        .apply("bounded", Create.of(1, 2, 3))
+        .setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBoundednessForBoundedFn() {
+    Pipeline pipeline = TestPipeline.create();
+    DoFn<Integer, String> boundedFn = new BoundedFakeFn();
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        makeBoundedCollection(pipeline)
+            .apply("bounded to bounded", new SplittableParDo<>(boundedFn))
+            .isBounded());
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        makeUnboundedCollection(pipeline)
+            .apply("bounded to unbounded", new SplittableParDo<>(boundedFn))
+            .isBounded());
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBoundednessForUnboundedFn() {
+    Pipeline pipeline = TestPipeline.create();
+    DoFn<Integer, String> unboundedFn = new UnboundedFakeFn();
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        makeBoundedCollection(pipeline)
+            .apply("unbounded to bounded", new SplittableParDo<>(unboundedFn))
+            .isBounded());
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        makeUnboundedCollection(pipeline)
+            .apply("unbounded to unbounded", new 
SplittableParDo<>(unboundedFn))
+            .isBounded());
+  }
+
+  // ------------------------------- Tests for ProcessFn 
---------------------------------
+
+  /**
+   * A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but 
possibly over multiple
+   * {@link DoFn.ProcessElement} calls).
+   */
+  private static class ProcessFnTester<
+      InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>> {
+    private final DoFnTester<
+            KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>, OutputT>
+        tester;
+    private Instant currentProcessingTime;
+
+    ProcessFnTester(
+        Instant currentProcessingTime,
+        DoFn<InputT, OutputT> fn,
+        Coder<InputT> inputCoder,
+        Coder<RestrictionT> restrictionCoder)
+        throws Exception {
+      SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> 
processFn =
+          new SplittableParDo.ProcessFn<>(
+              fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
+      this.tester = DoFnTester.of(processFn);
+      this.tester.startBundle();
+      this.tester.advanceProcessingTime(currentProcessingTime);
+
+      this.currentProcessingTime = currentProcessingTime;
+    }
+
+    /** Performs a seed {@link DoFn.ProcessElement} call feeding the element 
and restriction. */
+    void startElement(InputT element, RestrictionT restriction) throws 
Exception {
+      startElement(
+          WindowedValue.of(
+              ElementAndRestriction.of(element, restriction),
+              currentProcessingTime,
+              GlobalWindow.INSTANCE,
+              PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+
+    void startElement(WindowedValue<ElementAndRestriction<InputT, 
RestrictionT>> windowedValue)
+        throws Exception {
+      tester.processElement(KeyedWorkItems.elementsWorkItem("key", 
Arrays.asList(windowedValue)));
+    }
+
+    /**
+     * Advances processing time by a given duration and, if any timers fired, 
performs a non-seed
+     * {@link DoFn.ProcessElement} call, feeding it the timers.
+     */
+    boolean advanceProcessingTimeBy(Duration duration) throws Exception {
+      currentProcessingTime = currentProcessingTime.plus(duration);
+      List<TimerInternals.TimerData> timers = 
tester.advanceProcessingTime(currentProcessingTime);
+      if (timers.isEmpty()) {
+        return false;
+      }
+      tester.processElement(
+          KeyedWorkItems.<String, ElementAndRestriction<InputT, 
RestrictionT>>timersWorkItem(
+              "key", timers));
+      return true;
+    }
+
+    List<TimestampedValue<OutputT>> peekOutputElementsInWindow(BoundedWindow 
window) {
+      return tester.peekOutputElementsInWindow(window);
+    }
+
+    List<OutputT> takeOutputElements() {
+      return tester.takeOutputElements();
+    }
+  }
+
+  /** A simple splittable {@link DoFn} that's actually monolithic. */
+  private static class ToStringFn extends DoFn<Integer, String> {
+    @ProcessElement
+    public void process(ProcessContext c, SomeRestrictionTracker tracker) {
+      c.output(c.element().toString() + "a");
+      c.output(c.element().toString() + "b");
+      c.output(c.element().toString() + "c");
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer elem) {
+      return new SomeRestriction();
+    }
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return new SomeRestrictionTracker();
+    }
+  }
+
+  @Test
+  public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() 
throws Exception {
+    // Tests that ProcessFn correctly propagates windows and timestamp of the 
element
+    // inside the KeyedWorkItem.
+    // The underlying DoFn is actually monolithic, so this doesn't test 
splitting.
+    DoFn<Integer, String> fn = new ToStringFn();
+
+    Instant base = Instant.now();
+    ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> 
tester =
+        new ProcessFnTester<>(
+            base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeRestriction.class));
+
+    IntervalWindow w1 =
+        new IntervalWindow(
+            base.minus(Duration.standardMinutes(1)), 
base.plus(Duration.standardMinutes(1)));
+    IntervalWindow w2 =
+        new IntervalWindow(
+            base.minus(Duration.standardMinutes(2)), 
base.plus(Duration.standardMinutes(2)));
+    IntervalWindow w3 =
+        new IntervalWindow(
+            base.minus(Duration.standardMinutes(3)), 
base.plus(Duration.standardMinutes(3)));
+
+    tester.startElement(
+        WindowedValue.of(
+            ElementAndRestriction.of(42, new SomeRestriction()),
+            base,
+            Arrays.asList(w1, w2, w3),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) {
+      assertEquals(
+          Arrays.asList(
+              TimestampedValue.of("42a", base),
+              TimestampedValue.of("42b", base),
+              TimestampedValue.of("42c", base)),
+          tester.peekOutputElementsInWindow(w));
+    }
+  }
+
+  /** A simple splittable {@link DoFn} that outputs the given element every 5 
seconds forever. */
+  private static class SelfInitiatedResumeFn extends DoFn<Integer, String> {
+    @ProcessElement
+    public ProcessContinuation process(ProcessContext c, 
SomeRestrictionTracker tracker) {
+      c.output(c.element().toString());
+      return 
resume().withResumeDelay(Duration.standardSeconds(5)).withWatermark(c.timestamp());
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer elem) {
+      return new SomeRestriction();
+    }
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return new SomeRestrictionTracker();
+    }
+  }
+
+  @Test
+  public void testResumeSetsTimer() throws Exception {
+    DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
+    Instant base = Instant.now();
+    ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> 
tester =
+        new ProcessFnTester<>(
+            base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeRestriction.class));
+
+    tester.startElement(42, new SomeRestriction());
+    assertThat(tester.takeOutputElements(), contains("42"));
+
+    // Should resume after 5 seconds: advancing by 3 seconds should have no 
effect.
+    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertTrue(tester.takeOutputElements().isEmpty());
+
+    // 6 seconds should be enough - should invoke the fn again.
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertThat(tester.takeOutputElements(), contains("42"));
+
+    // Should again resume after 5 seconds: advancing by 3 seconds should 
again have no effect.
+    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertTrue(tester.takeOutputElements().isEmpty());
+
+    // 6 seconds should again be enough.
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertThat(tester.takeOutputElements(), contains("42"));
+  }
+
+  private static class SomeCheckpoint implements Serializable {
+    private int firstUnprocessedIndex;
+
+    private SomeCheckpoint(int firstUnprocessedIndex) {
+      this.firstUnprocessedIndex = firstUnprocessedIndex;
+    }
+  }
+
+  private static class SomeCheckpointTracker implements 
RestrictionTracker<SomeCheckpoint> {
+    private SomeCheckpoint current;
+    private boolean isActive = true;
+
+    private SomeCheckpointTracker(SomeCheckpoint current) {
+      this.current = current;
+    }
+
+    @Override
+    public SomeCheckpoint currentRestriction() {
+      return current;
+    }
+
+    public boolean tryUpdateCheckpoint(int firstUnprocessedIndex) {
+      if (!isActive) {
+        return false;
+      }
+      current = new SomeCheckpoint(firstUnprocessedIndex);
+      return true;
+    }
+
+    @Override
+    public SomeCheckpoint checkpoint() {
+      isActive = false;
+      return current;
+    }
+  }
+
+  /**
+   * A splittable {@link DoFn} that generates the sequence [init, init + 
total) in batches of given
+   * size.
+   */
+  private static class CounterFn extends DoFn<Integer, String> {
+    private final int numTotalOutputs;
+    private final int numOutputsPerCall;
+
+    private CounterFn(int numTotalOutputs, int numOutputsPerCall) {
+      this.numTotalOutputs = numTotalOutputs;
+      this.numOutputsPerCall = numOutputsPerCall;
+    }
+
+    @ProcessElement
+    public ProcessContinuation process(ProcessContext c, SomeCheckpointTracker 
tracker) {
+      int start = tracker.currentRestriction().firstUnprocessedIndex;
+      for (int i = 0; i < numOutputsPerCall; ++i) {
+        int index = start + i;
+        if (!tracker.tryUpdateCheckpoint(index + 1)) {
+          return resume();
+        }
+        if (index >= numTotalOutputs) {
+          return stop();
+        }
+        c.output(String.valueOf(c.element() + index));
+      }
+      return resume();
+    }
+
+    @GetInitialRestriction
+    public SomeCheckpoint getInitialRestriction(Integer elem) {
+      throw new UnsupportedOperationException("Expected to be supplied 
explicitly in this test");
+    }
+
+    @NewTracker
+    public SomeCheckpointTracker newTracker(SomeCheckpoint restriction) {
+      return new SomeCheckpointTracker(restriction);
+    }
+  }
+
+  @Test
+  public void testResumeCarriesOverState() throws Exception {
+    DoFn<Integer, String> fn = new CounterFn(3, 1);
+    Instant base = Instant.now();
+    ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> 
tester =
+        new ProcessFnTester<>(
+            base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeCheckpoint.class));
+
+    tester.startElement(42, new SomeCheckpoint(0));
+    assertThat(tester.takeOutputElements(), contains("42"));
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    assertThat(tester.takeOutputElements(), contains("43"));
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    assertThat(tester.takeOutputElements(), contains("44"));
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    // After outputting all 3 items, should not output anything more.
+    assertEquals(0, tester.takeOutputElements().size());
+    // Should also not ask to resume.
+    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+  }
+
+  @Test
+  public void testReactsToCheckpoint() throws Exception {
+    int max = SplittableParDo.ProcessFn.MAX_OUTPUTS_PER_BUNDLE;
+    // Create an fn that attempts to 2x output more than checkpointing allows.
+    DoFn<Integer, String> fn = new CounterFn(2 * max + max / 2, 2 * max);
+    Instant base = Instant.now();
+    int baseIndex = 42;
+
+    ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> 
tester =
+        new ProcessFnTester<>(
+            base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeCheckpoint.class));
+
+    List<String> elements;
+
+    tester.startElement(baseIndex, new SomeCheckpoint(0));
+    elements = tester.takeOutputElements();
+    assertEquals(max, elements.size());
+    // Should output the range [0, max)
+    assertThat(elements, hasItem(String.valueOf(baseIndex)));
+    assertThat(elements, hasItem(String.valueOf(baseIndex + max - 1)));
+
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    elements = tester.takeOutputElements();
+    assertEquals(max, elements.size());
+    // Should output the range [max, 2*max)
+    assertThat(elements, hasItem(String.valueOf(baseIndex + max)));
+    assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max - 1)));
+
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    elements = tester.takeOutputElements();
+    assertEquals(max / 2, elements.size());
+    // Should output the range [2*max, 2*max + max/2)
+    assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max)));
+    assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max + max / 2 
- 1)));
+    assertThat(elements, not(hasItem((String.valueOf(baseIndex + 2 * max + max 
/ 2)))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
new file mode 100644
index 0000000..e232552
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItemCoder;
+import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct 
Runner. */
+class DirectGBKIntoKeyedWorkItemsOverrideFactory implements 
PTransformOverrideFactory {
+  @Override
+  @SuppressWarnings("unchecked")
+  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, 
OutputT> override(
+      PTransform<InputT, OutputT> transform) {
+    return new DirectGBKIntoKeyedWorkItems(transform.getName());
+  }
+
+  /** The Direct Runner specific implementation of {@link 
GBKIntoKeyedWorkItems}. */
+  private static class DirectGBKIntoKeyedWorkItems<KeyT, InputT>
+      extends PTransform<PCollection<KV<KeyT, InputT>>, 
PCollection<KeyedWorkItem<KeyT, InputT>>> {
+    DirectGBKIntoKeyedWorkItems(String name) {
+      super(name);
+    }
+
+    @Override
+    public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, 
InputT>> input) {
+      checkArgument(input.getCoder() instanceof KvCoder);
+      KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
+      return input
+          .apply(new ReifyTimestampsAndWindows<KeyT, InputT>())
+          // TODO: Perhaps windowing strategy should instead be set by 
ReifyTAW, or by DGBKO
+          .setWindowingStrategyInternal(WindowingStrategy.globalDefault())
+          .apply(new DirectGroupByKey.DirectGroupByKeyOnly<KeyT, InputT>())
+          .setCoder(
+              KeyedWorkItemCoder.of(
+                  kvCoder.getKeyCoder(),
+                  kvCoder.getValueCoder(),
+                  input.getWindowingStrategy().getWindowFn().windowCoder()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 224101a..a72f7ae 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import 
org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -83,6 +84,10 @@ public class DirectRunner
               .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
               .put(TestStream.class, new DirectTestStreamFactory())
               .put(Write.Bound.class, new WriteWithShardingFactory())
+              .put(ParDo.Bound.class, new ParDoOverrideFactory())
+              .put(
+                  GBKIntoKeyedWorkItems.class,
+                  new DirectGBKIntoKeyedWorkItemsOverrideFactory())
               .build();
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
new file mode 100644
index 0000000..a57735c
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * A {@link PTransformOverrideFactory} that provides overrides for 
applications of a {@link ParDo}
+ * in the direct runner. Currently overrides applications of <a
+ * href="https://s.apache.org/splittable-do-fn";>Splittable DoFn</a>.
+ */
+class ParDoOverrideFactory implements PTransformOverrideFactory {
+  @Override
+  @SuppressWarnings("unchecked")
+  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, 
OutputT> override(
+      PTransform<InputT, OutputT> transform) {
+    if (!(transform instanceof ParDo.Bound)) {
+      return transform;
+    }
+    ParDo.Bound<InputT, OutputT> that = (ParDo.Bound<InputT, OutputT>) 
transform;
+    DoFn<InputT, OutputT> fn = DoFnAdapters.getDoFn(that.getFn());
+    if (fn == null) {
+      // This is an OldDoFn, hence not splittable.
+      return transform;
+    }
+    DoFnSignature signature = 
DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass());
+    if (!signature.processElement().isSplittable()) {
+      return transform;
+    }
+    return new SplittableParDo(fn);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
new file mode 100644
index 0000000..84a0cd9
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.MutableDateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> 
{@link DoFn} behavior
+ * using the direct runner.
+ *
+ * <p>TODO: make this use @RunnableOnService.
+ */
+@RunWith(JUnit4.class)
+public class SplittableDoFnTest {
+  static class OffsetRange implements Serializable {
+    public final int from;
+    public final int to;
+
+    OffsetRange(int from, int to) {
+      this.from = from;
+      this.to = to;
+    }
+  }
+
+  private static class OffsetRangeTracker implements 
RestrictionTracker<OffsetRange> {
+    private OffsetRange range;
+    private Integer lastClaimedIndex = null;
+
+    OffsetRangeTracker(OffsetRange range) {
+      this.range = checkNotNull(range);
+    }
+
+    @Override
+    public OffsetRange currentRestriction() {
+      return range;
+    }
+
+    @Override
+    public OffsetRange checkpoint() {
+      if (lastClaimedIndex == null) {
+        OffsetRange res = range;
+        range = new OffsetRange(range.from, range.from);
+        return res;
+      }
+      OffsetRange res = new OffsetRange(lastClaimedIndex + 1, range.to);
+      this.range = new OffsetRange(range.from, lastClaimedIndex + 1);
+      return res;
+    }
+
+    boolean tryClaim(int i) {
+      checkState(lastClaimedIndex == null || i > lastClaimedIndex);
+      if (i >= range.to) {
+        return false;
+      }
+      lastClaimedIndex = i;
+      return true;
+    }
+  }
+
+  static class PairStringWithIndexToLength extends DoFn<String, KV<String, 
Integer>> {
+    @ProcessElement
+    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker 
tracker) {
+      for (int i = tracker.currentRestriction().from; tracker.tryClaim(i); 
++i) {
+        c.output(KV.of(c.element(), i));
+        if (i % 3 == 0) {
+          return ProcessContinuation.resume();
+        }
+      }
+      return ProcessContinuation.stop();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(String element) {
+      return new OffsetRange(0, element.length());
+    }
+
+    @SplitRestriction
+    public void splitRange(
+        String element, OffsetRange range, OutputReceiver<OffsetRange> 
receiver) {
+      receiver.output(new OffsetRange(range.from, (range.from + range.to) / 
2));
+      receiver.output(new OffsetRange((range.from + range.to) / 2, range.to));
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(OffsetRange range) {
+      return new OffsetRangeTracker(range);
+    }
+  }
+
+  private static class ReifyTimestampsFn<T> extends DoFn<T, 
TimestampedValue<T>> {
+    @ProcessElement
+    public void process(ProcessContext c) {
+      c.output(TimestampedValue.of(c.element(), c.timestamp()));
+    }
+  }
+
+  @Test
+  public void testPairWithIndexBasic() throws ClassNotFoundException {
+    Pipeline p = TestPipeline.create();
+    p.getOptions().setRunner(DirectRunner.class);
+    PCollection<KV<String, Integer>> res =
+        p.apply(Create.of("a", "bb", "ccccc"))
+            .apply(ParDo.of(new PairStringWithIndexToLength()))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of()));
+
+    PAssert.that(res)
+        .containsInAnyOrder(
+            Arrays.asList(
+                KV.of("a", 0),
+                KV.of("bb", 0),
+                KV.of("bb", 1),
+                KV.of("ccccc", 0),
+                KV.of("ccccc", 1),
+                KV.of("ccccc", 2),
+                KV.of("ccccc", 3),
+                KV.of("ccccc", 4)));
+
+    p.run();
+  }
+
+  @Test
+  public void testPairWithIndexWindowedTimestamped() throws 
ClassNotFoundException {
+    // Tests that Splittable DoFn correctly propagates windowing strategy, 
windows and timestamps
+    // of elements in the input collection.
+    Pipeline p = TestPipeline.create();
+    p.getOptions().setRunner(DirectRunner.class);
+
+    MutableDateTime mutableNow = Instant.now().toMutableDateTime();
+    mutableNow.setMillisOfSecond(0);
+    Instant now = mutableNow.toInstant();
+    Instant nowP1 = now.plus(Duration.standardSeconds(1));
+    Instant nowP2 = now.plus(Duration.standardSeconds(2));
+
+    SlidingWindows windowFn =
+        
SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
+    PCollection<KV<String, Integer>> res =
+        p.apply(
+                Create.timestamped(
+                    TimestampedValue.of("a", now),
+                    TimestampedValue.of("bb", nowP1),
+                    TimestampedValue.of("ccccc", nowP2)))
+            .apply(Window.<String>into(windowFn))
+            .apply(ParDo.of(new PairStringWithIndexToLength()))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of()));
+
+    assertEquals(windowFn, res.getWindowingStrategy().getWindowFn());
+
+    PCollection<TimestampedValue<KV<String, Integer>>> timestamped =
+        res.apply("Reify timestamps", ParDo.of(new 
ReifyTimestampsFn<KV<String, Integer>>()));
+
+    for (int i = 0; i < 4; ++i) {
+      Instant base = now.minus(Duration.standardSeconds(i));
+      IntervalWindow window = new IntervalWindow(base, 
base.plus(Duration.standardSeconds(5)));
+
+      List<TimestampedValue<KV<String, Integer>>> expectedUnfiltered =
+          Arrays.asList(
+              TimestampedValue.of(KV.of("a", 0), now),
+              TimestampedValue.of(KV.of("bb", 0), nowP1),
+              TimestampedValue.of(KV.of("bb", 1), nowP1),
+              TimestampedValue.of(KV.of("ccccc", 0), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 1), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 2), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 3), nowP2),
+              TimestampedValue.of(KV.of("ccccc", 4), nowP2));
+
+      List<TimestampedValue<KV<String, Integer>>> expected = new ArrayList<>();
+      for (TimestampedValue<KV<String, Integer>> tv : expectedUnfiltered) {
+        if (!window.start().isAfter(tv.getTimestamp())
+            && !tv.getTimestamp().isAfter(window.maxTimestamp())) {
+          expected.add(tv);
+        }
+      }
+      assertFalse(expected.isEmpty());
+
+      PAssert.that(timestamped).inWindow(window).containsInAnyOrder(expected);
+    }
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index f806926..789f4b2 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -76,6 +76,12 @@ public @interface Experimental {
     TIMERS,
 
     /** Experimental APIs related to customizing the output time for computed 
values. */
-    OUTPUT_TIME
+    OUTPUT_TIME,
+
+    /**
+     * <a href="https://s.apache.org/splittable-do-fn";>Splittable DoFn</a>.
+     * Do not use: API is unstable and runner support is incomplete.
+     */
+    SPLITTABLE_DO_FN,
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index fb7fbd4..62da28c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -21,6 +21,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
@@ -29,6 +30,8 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -37,9 +40,11 @@ import 
org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -341,14 +346,20 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
      */
     @Deprecated
     WindowingInternals<InputT, OutputT> windowingInternals();
+
+    /**
+     * If this is a splittable {@link DoFn}, returns the {@link 
RestrictionTracker} associated with
+     * the current {@link ProcessElement} call.
+     */
+    <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker();
   }
 
-  /** A placeholder for testing handling of output types during {@link DoFn} 
reflection. */
+  /** Receives values of the given type. */
   public interface OutputReceiver<T> {
     void output(T output);
   }
 
-  /** A placeholder for testing handling of input types during {@link DoFn} 
reflection. */
+  /** Provides a single value of the given type. */
   public interface InputProvider<T> {
     T get();
   }
@@ -375,6 +386,10 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
     public WindowingInternals<InputT, OutputT> windowingInternals() {
       return null;
     }
+
+    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+      return null;
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -412,14 +427,57 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
   public @interface StartBundle {}
 
   /**
-   * Annotation for the method to use for processing elements. A subclass of
-   * {@link DoFn} must have a method with this annotation satisfying
-   * the following constraints in order for it to be executable:
+   * Annotation for the method to use for processing elements. A subclass of 
{@link DoFn} must have
+   * a method with this annotation.
+   *
+   * <p>The signature of this method must satisfy the following constraints:
+   *
    * <ul>
-   *   <li>It must have at least one argument.
-   *   <li>Its first argument must be a {@link DoFn.ProcessContext}.
-   *   <li>Its remaining argument, if any, must be {@link BoundedWindow}.
+   * <li>Its first argument must be a {@link DoFn.ProcessContext}.
+   * <li>If one of its arguments is a subtype of {@link RestrictionTracker}, 
then it is a <a
+   *     href="https://s.apache.org/splittable-do-fn>splittable</a> {@link 
DoFn} subject to the
+   *     separate requirements described below. Items below are assuming this 
is not a splittable
+   *     {@link DoFn}.
+   * <li>If one of its arguments is {@link BoundedWindow}, this argument 
corresponds to the window
+   *     of the current element. If absent, a runner may perform additional 
optimizations.
+   * <li>It must return {@code void}.
    * </ul>
+   *
+   * <h2>Splittable DoFn's (WARNING: work in progress, do not use)</h2>
+   *
+   * <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} 
method has a parameter
+   * whose type is a subtype of {@link RestrictionTracker}. This is an 
advanced feature and an
+   * overwhelming majority of users will never need to write a splittable 
{@link DoFn}. Right now
+   * the implementation of this feature is in progress and it's not ready for 
any use.
+   *
+   * <p>See <a href="https://s.apache.org/splittable-do-fn";>the proposal</a> 
for an overview of the
+   * involved concepts (<i>splittable DoFn</i>, <i>restriction</i>, 
<i>restriction tracker</i>).
+   *
+   * <p>If a {@link DoFn} is splittable, the following constraints must be 
respected:
+   *
+   * <ul>
+   * <li>It <i>must</i> define a {@link GetInitialRestriction} method.
+   * <li>It <i>may</i> define a {@link SplitRestriction} method.
+   * <li>It <i>must</i> define a {@link NewTracker} method returning the same 
type as the type of
+   *     the {@link RestrictionTracker} argument of {@link ProcessElement}, 
which in turn must be a
+   *     subtype of {@code RestrictionTracker<R>} where {@code R} is the 
restriction type returned
+   *     by {@link GetInitialRestriction}.
+   * <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
+   * <li>The type of restrictions used by all of these methods must be the 
same.
+   * <li>Its {@link ProcessElement} method <i>may</i> return a {@link 
ProcessContinuation} to
+   *     indicate whether there is more work to be done for the current 
element.
+   * <li>Its {@link ProcessElement} method <i>must not</i> use any extra 
context parameters, such as
+   *     {@link BoundedWindow}.
+   * <li>The {@link DoFn} itself <i>may</i> be annotated with {@link 
BoundedPerElement} or
+   *     {@link UnboundedPerElement}, but not both at the same time. If it's 
not annotated with
+   *     either of these, it's assumed to be {@link BoundedPerElement} if its 
{@link
+   *     ProcessElement} method returns {@code void} and {@link 
UnboundedPerElement} if it
+   *     returns a {@link ProcessContinuation}.
+   * </ul>
+   *
+   * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these 
methods.
+   *
+   * <p>More documentation will be added when the feature becomes ready for 
general usage.
    */
   @Documented
   @Retention(RetentionPolicy.RUNTIME)
@@ -455,6 +513,150 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
   }
 
   /**
+   * Annotation for the method that maps an element to an initial restriction 
for a <a
+   * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn}.
+   *
+   * <p>Signature: {@code RestrictionT getInitialRestriction(InputT element);}
+   *
+   * <p>TODO: Make the InputT parameter optional.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface GetInitialRestriction {}
+
+  /**
+   * Annotation for the method that returns the coder to use for the 
restriction of a <a
+   * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn}.
+   *
+   * <p>If not defined, a coder will be inferred using standard coder 
inference rules and the
+   * pipeline's {@link Pipeline#getCoderRegistry coder registry}.
+   *
+   * <p>This method will be called only at pipeline construction time.
+   *
+   * <p>Signature: {@code Coder<RestrictionT> getRestrictionCoder();}
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface GetRestrictionCoder {}
+
+  /**
+   * Annotation for the method that splits restriction of a <a
+   * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn} 
into multiple parts to
+   * be processed in parallel.
+   *
+   * <p>Signature: {@code List<RestrictionT> splitRestriction( InputT element, 
RestrictionT
+   * restriction);}
+   *
+   * <p>Optional: if this method is omitted, the restriction will not be split 
(equivalent to
+   * defining the method and returning {@code 
Collections.singletonList(restriction)}).
+   *
+   * <p>TODO: Introduce a parameter for controlling granularity of splitting, 
e.g. numParts. TODO:
+   * Make the InputT parameter optional.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface SplitRestriction {}
+
+  /**
+   * Annotation for the method that creates a new {@link RestrictionTracker} 
for the restriction of
+   * a <a href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link 
DoFn}.
+   *
+   * <p>Signature: {@code MyRestrictionTracker newTracker(RestrictionT 
restriction);} where {@code
+   * MyRestrictionTracker} must be a subtype of {@code 
RestrictionTracker<RestrictionT>}.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface NewTracker {}
+
+  /**
+   * Annotation on a <a 
href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn}
+   * specifying that the {@link DoFn} performs a bounded amount of work per 
input element, so
+   * applying it to a bounded {@link PCollection} will produce also a bounded 
{@link PCollection}.
+   * It is an error to specify this on a non-splittable {@link DoFn}.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.TYPE)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface BoundedPerElement {}
+
+  /**
+   * Annotation on a <a 
href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn}
+   * specifying that the {@link DoFn} performs an unbounded amount of work per 
input element, so
+   * applying it to a bounded {@link PCollection} will produce an unbounded 
{@link PCollection}. It
+   * is an error to specify this on a non-splittable {@link DoFn}.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.TYPE)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface UnboundedPerElement {}
+
+  // This can't be put into ProcessContinuation itself due to the following 
problem:
+  // 
http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
+  private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
+      new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO, null);
+
+  /**
+   * When used as a return value of {@link ProcessElement}, indicates whether 
there is more work to
+   * be done for the current element.
+   */
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  @AutoValue
+  public abstract static class ProcessContinuation {
+    /** Indicates that there is no more work to be done for the current 
element. */
+    public static ProcessContinuation stop() {
+      return PROCESS_CONTINUATION_STOP;
+    }
+
+    /** Indicates that there is more work to be done for the current element. 
*/
+    public static ProcessContinuation resume() {
+      return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO, null);
+    }
+
+    /**
+     * If false, the {@link DoFn} promises that there is no more work 
remaining for the current
+     * element, so the runner should not resume the {@link ProcessElement} 
call.
+     */
+    public abstract boolean shouldResume();
+
+    /**
+     * A minimum duration that should elapse between the end of this {@link 
ProcessElement} call and
+     * the {@link ProcessElement} call continuing processing of the same 
element. By default, zero.
+     */
+    public abstract Duration resumeDelay();
+
+    /**
+     * A lower bound provided by the {@link DoFn} on timestamps of the output 
that will be emitted
+     * by future {@link ProcessElement} calls continuing processing of the 
current element.
+     *
+     * <p>A runner should treat an absent value as equivalent to the timestamp 
of the input element.
+     */
+    @Nullable
+    public abstract Instant getWatermark();
+
+    /** Builder method to set the value of {@link #resumeDelay()}. */
+    public ProcessContinuation withResumeDelay(Duration resumeDelay) {
+      return new AutoValue_DoFn_ProcessContinuation(
+          shouldResume(), resumeDelay, getWatermark());
+    }
+
+    /** Builder method to set the value of {@link #getWatermark()}. */
+    public ProcessContinuation withWatermark(Instant watermark) {
+      return new AutoValue_DoFn_ProcessContinuation(
+          shouldResume(), resumeDelay(), watermark);
+    }
+  }
+
+  /**
    * Returns an {@link Aggregator} with aggregation logic specified by the
    * {@link CombineFn} argument. The name provided must be unique across
    * {@link Aggregator}s created within the {@link DoFn}. Aggregators can only 
be created


Reply via email to