Repository: incubator-beam
Updated Branches:
  refs/heads/master a5d129361 -> 13b45895e


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index d057765..0bfe2be 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -17,14 +17,32 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-
+import static org.mockito.Mockito.when;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.GetInitialRestriction;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import 
org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowingInternals;
@@ -34,7 +52,9 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.AdditionalAnswers;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
 /** Tests for {@link DoFnInvokers}. */
@@ -76,11 +96,16 @@ public class DoFnInvokersTest {
           public WindowingInternals<String, String> windowingInternals() {
             return mockWindowingInternals;
           }
+
+          @Override
+          public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+            return null;
+          }
         };
   }
 
-  private void invokeProcessElement(DoFn<String, String> fn) {
-    DoFnInvokers.INSTANCE
+  private ProcessContinuation invokeProcessElement(DoFn<String, String> fn) {
+    return DoFnInvokers.INSTANCE
         .newByteBuddyInvoker(fn)
         .invokeProcessElement(mockContext, extraContextFactory);
   }
@@ -106,9 +131,9 @@ public class DoFnInvokersTest {
       @ProcessElement
       public void processElement(ProcessContext c) throws Exception {}
     }
-    MockFn fn = mock(MockFn.class);
-    invokeProcessElement(fn);
-    verify(fn).processElement(mockContext);
+    MockFn mockFn = mock(MockFn.class);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(mockFn));
+    verify(mockFn).processElement(mockContext);
   }
 
   interface InterfaceWithProcessElement {
@@ -128,7 +153,7 @@ public class DoFnInvokersTest {
   public void testDoFnWithProcessElementInterface() throws Exception {
     IdentityUsingInterfaceWithProcessElement fn =
         mock(IdentityUsingInterfaceWithProcessElement.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockContext);
   }
 
@@ -149,14 +174,14 @@ public class DoFnInvokersTest {
   @Test
   public void testDoFnWithMethodInSuperclass() throws Exception {
     IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).process(mockContext);
   }
 
   @Test
   public void testDoFnWithMethodInSubclass() throws Exception {
     IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).process(mockContext);
   }
 
@@ -167,7 +192,7 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c, BoundedWindow w) throws 
Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockContext, mockWindow);
   }
 
@@ -178,7 +203,7 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c, OutputReceiver<String> o) 
throws Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockContext, mockOutputReceiver);
   }
 
@@ -189,11 +214,35 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c, InputProvider<String> o) 
throws Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockContext, mockInputProvider);
   }
 
   @Test
+  public void testDoFnWithReturn() throws Exception {
+    class MockFn extends DoFn<String, String> {
+      @DoFn.ProcessElement
+      public ProcessContinuation processElement(ProcessContext c, 
SomeRestrictionTracker tracker)
+          throws Exception {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(String element) {
+        return null;
+      }
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+    }
+    MockFn fn = mock(MockFn.class);
+    when(fn.processElement(mockContext, 
null)).thenReturn(ProcessContinuation.resume());
+    assertEquals(ProcessContinuation.resume(), invokeProcessElement(fn));
+  }
+
+  @Test
   public void testDoFnWithStartBundleSetupTeardown() throws Exception {
     class MockFn extends DoFn<String, String> {
       @ProcessElement
@@ -224,6 +273,154 @@ public class DoFnInvokersTest {
   }
 
   // 
---------------------------------------------------------------------------------------
+  // Tests for invoking Splittable DoFn methods
+  // 
---------------------------------------------------------------------------------------
+  private static class SomeRestriction {}
+
+  private abstract static class SomeRestrictionTracker
+      implements RestrictionTracker<SomeRestriction> {}
+
+  private static class SomeRestrictionCoder extends 
CustomCoder<SomeRestriction> {
+    public static SomeRestrictionCoder of() {
+      return new SomeRestrictionCoder();
+    }
+
+    @Override
+    public void encode(SomeRestriction value, OutputStream outStream, Context 
context) {}
+
+    @Override
+    public SomeRestriction decode(InputStream inStream, Context context) {
+      return null;
+    }
+  }
+
+  /** Public so Mockito can do "delegatesTo()" in the test below. */
+  public static class MockFn extends DoFn<String, String> {
+    @ProcessElement
+    public ProcessContinuation processElement(ProcessContext c, 
SomeRestrictionTracker tracker) {
+      return null;
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(String element) {
+      return null;
+    }
+
+    @SplitRestriction
+    public void splitRestriction(
+        String element, SomeRestriction restriction, 
OutputReceiver<SomeRestriction> receiver) {}
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return null;
+    }
+
+    @GetRestrictionCoder
+    public SomeRestrictionCoder getRestrictionCoder() {
+      return null;
+    }
+  }
+
+  @Test
+  public void testSplittableDoFnWithAllMethods() throws Exception {
+    MockFn fn = mock(MockFn.class);
+    DoFnInvoker<String, String> invoker = 
DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    final SomeRestrictionTracker tracker = mock(SomeRestrictionTracker.class);
+    final SomeRestrictionCoder coder = mock(SomeRestrictionCoder.class);
+    SomeRestriction restriction = new SomeRestriction();
+    final SomeRestriction part1 = new SomeRestriction();
+    final SomeRestriction part2 = new SomeRestriction();
+    final SomeRestriction part3 = new SomeRestriction();
+    when(fn.getRestrictionCoder()).thenReturn(coder);
+    when(fn.getInitialRestriction("blah")).thenReturn(restriction);
+    doAnswer(
+            AdditionalAnswers.delegatesTo(
+                new MockFn() {
+                  @DoFn.SplitRestriction
+                  @Override
+                  public void splitRestriction(
+                      String element,
+                      SomeRestriction restriction,
+                      DoFn.OutputReceiver<SomeRestriction> receiver) {
+                    receiver.output(part1);
+                    receiver.output(part2);
+                    receiver.output(part3);
+                  }
+                }))
+        .when(fn)
+        .splitRestriction(
+            eq("blah"), same(restriction), 
Mockito.<DoFn.OutputReceiver<SomeRestriction>>any());
+    when(fn.newTracker(restriction)).thenReturn(tracker);
+    when(fn.processElement(mockContext, 
tracker)).thenReturn(ProcessContinuation.resume());
+
+    assertEquals(coder, invoker.invokeGetRestrictionCoder(new 
CoderRegistry()));
+    assertEquals(restriction, invoker.invokeGetInitialRestriction("blah"));
+    final List<SomeRestriction> outputs = new ArrayList<>();
+    invoker.invokeSplitRestriction(
+        "blah",
+        restriction,
+        new DoFn.OutputReceiver<SomeRestriction>() {
+          @Override
+          public void output(SomeRestriction output) {
+            outputs.add(output);
+          }
+        });
+    assertEquals(Arrays.asList(part1, part2, part3), outputs);
+    assertEquals(tracker, invoker.invokeNewTracker(restriction));
+    assertEquals(
+        ProcessContinuation.resume(),
+        invoker.invokeProcessElement(
+            mockContext,
+            new DoFn.FakeExtraContextFactory<String, String>() {
+              @Override
+              public RestrictionTracker restrictionTracker() {
+                return tracker;
+              }
+            }));
+  }
+
+  @Test
+  public void testSplittableDoFnDefaultMethods() throws Exception {
+    class MockFn extends DoFn<String, String> {
+      @ProcessElement
+      public void processElement(ProcessContext c, SomeRestrictionTracker 
tracker) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(String element) {
+        return null;
+      }
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+    }
+    MockFn fn = mock(MockFn.class);
+    DoFnInvoker<String, String> invoker = 
DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+
+    CoderRegistry coderRegistry = new CoderRegistry();
+    coderRegistry.registerCoder(SomeRestriction.class, 
SomeRestrictionCoder.class);
+    assertThat(
+        invoker.<SomeRestriction>invokeGetRestrictionCoder(coderRegistry),
+        instanceOf(SomeRestrictionCoder.class));
+    invoker.invokeSplitRestriction(
+        "blah",
+        "foo",
+        new DoFn.OutputReceiver<String>() {
+          private boolean invoked;
+
+          @Override
+          public void output(String output) {
+            assertFalse(invoked);
+            invoked = true;
+            assertEquals("foo", output);
+          }
+        });
+    assertEquals(
+        ProcessContinuation.stop(), invoker.invokeProcessElement(mockContext, 
extraContextFactory));
+  }
+
+  // 
---------------------------------------------------------------------------------------
   // Tests for ability to invoke private, inner and anonymous classes.
   // 
---------------------------------------------------------------------------------------
 
@@ -235,14 +432,14 @@ public class DoFnInvokersTest {
   @Test
   public void testLocalPrivateDoFnClass() throws Exception {
     PrivateDoFnClass fn = mock(PrivateDoFnClass.class);
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     verify(fn).processThis(mockContext);
   }
 
   @Test
   public void testStaticPackagePrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = 
mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockContext);
   }
 
@@ -250,28 +447,28 @@ public class DoFnInvokersTest {
   public void testInnerPackagePrivateDoFnClass() throws Exception {
     DoFn<String, String> fn =
         mock(new 
DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockContext);
   }
 
   @Test
   public void testStaticPrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = 
mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockContext);
   }
 
   @Test
   public void testInnerPrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(new 
DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockContext);
   }
 
   @Test
   public void testAnonymousInnerDoFn() throws Exception {
     DoFn<String, String> fn = mock(new 
DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockContext);
   }
 
@@ -279,7 +476,7 @@ public class DoFnInvokersTest {
   public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
     // Can't use mockito for this one - the anonymous class is final and can't 
be mocked.
     DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFn();
-    invokeProcessElement(fn);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyStaticAnonymousDoFnInvoked(fn, mockContext);
   }
 
@@ -303,6 +500,32 @@ public class DoFnInvokersTest {
   }
 
   @Test
+  public void testProcessElementExceptionWithReturn() throws Exception {
+    thrown.expect(UserCodeException.class);
+    thrown.expectMessage("bogus");
+    DoFnInvokers.INSTANCE
+        .newByteBuddyInvoker(
+            new DoFn<Integer, Integer>() {
+              @ProcessElement
+              public ProcessContinuation processElement(
+                  @SuppressWarnings("unused") ProcessContext c, 
SomeRestrictionTracker tracker) {
+                throw new IllegalArgumentException("bogus");
+              }
+
+              @GetInitialRestriction
+              public SomeRestriction getInitialRestriction(Integer element) {
+                return null;
+              }
+
+              @NewTracker
+              public SomeRestrictionTracker newTracker(SomeRestriction 
restriction) {
+                return null;
+              }
+            })
+        .invokeProcessElement(null, new DoFn.FakeExtraContextFactory<Integer, 
Integer>());
+  }
+
+  @Test
   public void testStartBundleException() throws Exception {
     DoFnInvoker<Integer, Integer> invoker =
         DoFnInvokers.INSTANCE.newByteBuddyInvoker(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
index c269dbd..9cb1d23 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -61,7 +61,7 @@ public class DoFnSignaturesProcessElementTest {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(
         "Integer is not a valid context parameter. "
-            + "Should be one of [BoundedWindow]");
+            + "Should be one of [BoundedWindow, RestrictionTracker<?>]");
 
     analyzeProcessElementMethod(
         new AnonymousMethod() {
@@ -72,7 +72,7 @@ public class DoFnSignaturesProcessElementTest {
   @Test
   public void testBadReturnType() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Must return void");
+    thrown.expectMessage("Must return void or ProcessContinuation");
 
     analyzeProcessElementMethod(
         new AnonymousMethod() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
new file mode 100644
index 0000000..a9a7c81
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static 
org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.analyzeProcessElementMethod;
+import static 
org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.errors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.reflect.TypeToken;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DoFnSignatures} focused on methods related to <a
+ * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn}.
+ */
+@SuppressWarnings("unused")
+@RunWith(JUnit4.class)
+public class DoFnSignaturesSplittableDoFnTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private static class SomeRestriction {}
+
+  private abstract static class SomeRestrictionTracker
+      implements RestrictionTracker<SomeRestriction> {}
+
+  private abstract static class SomeRestrictionCoder implements 
Coder<SomeRestriction> {}
+
+  @Test
+  public void testReturnsProcessContinuation() throws Exception {
+    DoFnSignature.ProcessElementMethod signature =
+        analyzeProcessElementMethod(
+            new AnonymousMethod() {
+              private DoFn.ProcessContinuation method(
+                  DoFn<Integer, String>.ProcessContext context) {
+                return null;
+              }
+            });
+
+    assertTrue(signature.hasReturnValue());
+  }
+
+  @Test
+  public void testHasRestrictionTracker() throws Exception {
+    DoFnSignature.ProcessElementMethod signature =
+        analyzeProcessElementMethod(
+            new AnonymousMethod() {
+              private void method(
+                  DoFn<Integer, String>.ProcessContext context, 
SomeRestrictionTracker tracker) {}
+            });
+
+    assertTrue(signature.isSplittable());
+    
assertTrue(signature.extraParameters().contains(DoFnSignature.Parameter.RESTRICTION_TRACKER));
+    assertEquals(SomeRestrictionTracker.class, 
signature.trackerT().getRawType());
+  }
+
+  @Test
+  public void testSplittableProcessElementMustNotHaveOtherParams() throws 
Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("must not have any extra context arguments");
+    thrown.expectMessage("BOUNDED_WINDOW");
+
+    DoFnSignature.ProcessElementMethod signature =
+        analyzeProcessElementMethod(
+            new AnonymousMethod() {
+              private void method(
+                  DoFn<Integer, String>.ProcessContext context,
+                  SomeRestrictionTracker tracker,
+                  BoundedWindow window) {}
+            });
+  }
+
+  @Test
+  public void testInfersBoundednessFromAnnotation() throws Exception {
+    class BaseSplittableFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void processElement(ProcessContext context, 
SomeRestrictionTracker tracker) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+    }
+
+    @BoundedPerElement
+    class BoundedSplittableFn extends BaseSplittableFn {}
+
+    @UnboundedPerElement
+    class UnboundedSplittableFn extends BaseSplittableFn {}
+
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(BaseSplittableFn.class)
+            .isBoundedPerElement());
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(BoundedSplittableFn.class)
+            .isBoundedPerElement());
+    assertEquals(
+        PCollection.IsBounded.UNBOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(UnboundedSplittableFn.class)
+            .isBoundedPerElement());
+  }
+
+  @Test
+  public void testUnsplittableIsBounded() throws Exception {
+    class UnsplittableFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context) {}
+    }
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(UnsplittableFn.class)
+            .isBoundedPerElement());
+  }
+
+  private static class BaseFnWithContinuation extends DoFn<Integer, String> {
+    @ProcessElement
+    public ProcessContinuation processElement(
+        ProcessContext context, SomeRestrictionTracker tracker) {
+      return null;
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer element) {
+      return null;
+    }
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return null;
+    }
+  }
+
+  @Test
+  public void testSplittableIsBoundedByDefault() throws Exception {
+    assertEquals(
+        PCollection.IsBounded.UNBOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(BaseFnWithContinuation.class)
+            .isBoundedPerElement());
+  }
+
+  @Test
+  public void testSplittableRespectsBoundednessAnnotation() throws Exception {
+    @BoundedPerElement
+    class BoundedFnWithContinuation extends BaseFnWithContinuation {}
+
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(BoundedFnWithContinuation.class)
+            .isBoundedPerElement());
+
+    @UnboundedPerElement
+    class UnboundedFnWithContinuation extends BaseFnWithContinuation {}
+
+    assertEquals(
+        PCollection.IsBounded.UNBOUNDED,
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(UnboundedFnWithContinuation.class)
+            .isBoundedPerElement());
+  }
+
+  @Test
+  public void testUnsplittableButDeclaresBounded() throws Exception {
+    @BoundedPerElement
+    class SomeFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context) {}
+    }
+
+    thrown.expectMessage("Non-splittable, but annotated as @Bounded");
+    DoFnSignatures.INSTANCE.getOrParseSignature(SomeFn.class);
+  }
+
+  @Test
+  public void testUnsplittableButDeclaresUnbounded() throws Exception {
+    @UnboundedPerElement
+    class SomeFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context) {}
+    }
+
+    thrown.expectMessage("Non-splittable, but annotated as @Unbounded");
+    DoFnSignatures.INSTANCE.getOrParseSignature(SomeFn.class);
+  }
+
+  /** Tests a splittable {@link DoFn} that defines all methods in their full 
form, correctly. */
+  @Test
+  public void testSplittableWithAllFunctions() throws Exception {
+    class GoodSplittableDoFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public ProcessContinuation processElement(
+          ProcessContext context, SomeRestrictionTracker tracker) {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @SplitRestriction
+      public void splitRestriction(
+          Integer element, SomeRestriction restriction, 
OutputReceiver<SomeRestriction> receiver) {}
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+
+      @GetRestrictionCoder
+      public SomeRestrictionCoder getRestrictionCoder() {
+        return null;
+      }
+    }
+
+    DoFnSignature signature = 
DoFnSignatures.INSTANCE.getOrParseSignature(GoodSplittableDoFn.class);
+    assertEquals(SomeRestrictionTracker.class, 
signature.processElement().trackerT().getRawType());
+    assertTrue(signature.processElement().isSplittable());
+    assertTrue(signature.processElement().hasReturnValue());
+    assertEquals(
+        SomeRestriction.class, 
signature.getInitialRestriction().restrictionT().getRawType());
+    assertEquals(SomeRestriction.class, 
signature.splitRestriction().restrictionT().getRawType());
+    assertEquals(SomeRestrictionTracker.class, 
signature.newTracker().trackerT().getRawType());
+    assertEquals(SomeRestriction.class, 
signature.newTracker().restrictionT().getRawType());
+    assertEquals(SomeRestrictionCoder.class, 
signature.getRestrictionCoder().coderT().getRawType());
+  }
+
+  /**
+   * Tests a splittable {@link DoFn} that defines all methods in their full 
form, correctly, using
+   * generic types.
+   */
+  @Test
+  public void testSplittableWithAllFunctionsGeneric() throws Exception {
+    class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends 
DoFn<Integer, String> {
+      @ProcessElement
+      public ProcessContinuation processElement(ProcessContext context, 
TrackerT tracker) {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public RestrictionT getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @SplitRestriction
+      public void splitRestriction(
+          Integer element, RestrictionT restriction, 
OutputReceiver<RestrictionT> receiver) {}
+
+      @NewTracker
+      public TrackerT newTracker(RestrictionT restriction) {
+        return null;
+      }
+
+      @GetRestrictionCoder
+      public CoderT getRestrictionCoder() {
+        return null;
+      }
+    }
+
+    DoFnSignature signature =
+        DoFnSignatures.INSTANCE.getOrParseSignature(
+            new GoodGenericSplittableDoFn<
+                SomeRestriction, SomeRestrictionTracker, 
SomeRestrictionCoder>() {}.getClass());
+    assertEquals(SomeRestrictionTracker.class, 
signature.processElement().trackerT().getRawType());
+    assertTrue(signature.processElement().isSplittable());
+    assertTrue(signature.processElement().hasReturnValue());
+    assertEquals(
+        SomeRestriction.class, 
signature.getInitialRestriction().restrictionT().getRawType());
+    assertEquals(SomeRestriction.class, 
signature.splitRestriction().restrictionT().getRawType());
+    assertEquals(SomeRestrictionTracker.class, 
signature.newTracker().trackerT().getRawType());
+    assertEquals(SomeRestriction.class, 
signature.newTracker().restrictionT().getRawType());
+    assertEquals(SomeRestrictionCoder.class, 
signature.getRestrictionCoder().coderT().getRawType());
+  }
+
+  @Test
+  public void testSplittableMissingRequiredMethods() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context, SomeRestrictionTracker 
tracker) {}
+    }
+
+    thrown.expectMessage(
+        "Splittable, but does not define the following required methods: "
+            + "[@GetInitialRestriction, @NewTracker]");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testNewTrackerReturnsWrongType() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context, SomeRestrictionTracker 
tracker) {}
+
+      @NewTracker
+      public void newTracker(SomeRestriction restriction) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "Returns void, but must return a subtype of 
RestrictionTracker<SomeRestriction>");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testGetInitialRestrictionMismatchesNewTracker() throws Exception 
{
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context, SomeRestrictionTracker 
tracker) {}
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public String getInitialRestriction(Integer element) {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "getInitialRestriction(Integer): Uses restriction type String, but 
@NewTracker method");
+    thrown.expectMessage("newTracker(SomeRestriction) uses restriction type 
SomeRestriction");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testGetRestrictionCoderReturnsWrongType() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context, SomeRestrictionTracker 
tracker) {}
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @GetRestrictionCoder
+      public KvCoder getRestrictionCoder() {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "getRestrictionCoder() returns KvCoder which is not a subtype of 
Coder<SomeRestriction>");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testSplitRestrictionReturnsWrongType() throws Exception {
+    thrown.expectMessage(
+        "Third argument must be OutputReceiver<SomeRestriction>, but is 
OutputReceiver<String>");
+    DoFnSignatures.analyzeSplitRestrictionMethod(
+        errors(),
+        TypeToken.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          void method(
+              Integer element, SomeRestriction restriction, 
DoFn.OutputReceiver<String> receiver) {}
+        }.getMethod(),
+        TypeToken.of(Integer.class));
+  }
+
+  @Test
+  public void testSplitRestrictionWrongElementArgument() throws Exception {
+    class BadFn {
+      private List<SomeRestriction> splitRestriction(String element, 
SomeRestriction restriction) {
+        return null;
+      }
+    }
+
+    thrown.expectMessage("First argument must be the element type Integer");
+    DoFnSignatures.analyzeSplitRestrictionMethod(
+        errors(),
+        TypeToken.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          void method(
+              String element,
+              SomeRestriction restriction,
+              DoFn.OutputReceiver<SomeRestriction> receiver) {}
+        }.getMethod(),
+        TypeToken.of(Integer.class));
+  }
+
+  @Test
+  public void testSplitRestrictionWrongNumArguments() throws Exception {
+    thrown.expectMessage("Must have exactly 3 arguments");
+    DoFnSignatures.analyzeSplitRestrictionMethod(
+        errors(),
+        TypeToken.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          private void method(
+              Integer element,
+              SomeRestriction restriction,
+              DoFn.OutputReceiver<SomeRestriction> receiver,
+              Object extra) {}
+        }.getMethod(),
+        TypeToken.of(Integer.class));
+  }
+
+  @Test
+  public void testSplitRestrictionConsistentButWrongType() throws Exception {
+    class OtherRestriction {}
+
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext context, SomeRestrictionTracker 
tracker) {}
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @DoFn.SplitRestriction
+      public void splitRestriction(
+          Integer element,
+          OtherRestriction restriction,
+          OutputReceiver<OtherRestriction> receiver) {}
+    }
+
+    thrown.expectMessage(
+        "getInitialRestriction(Integer): Uses restriction type 
SomeRestriction, "
+            + "but @SplitRestriction method ");
+    thrown.expectMessage(
+        "splitRestriction(Integer, OtherRestriction, OutputReceiver) "
+            + "uses restriction type OtherRestriction");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testUnsplittableMustNotDefineExtraMethods() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void processElement(ProcessContext context) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(Integer element) {
+        return null;
+      }
+
+      @SplitRestriction
+      public void splitRestriction(
+          Integer element, SomeRestriction restriction, 
OutputReceiver<SomeRestriction> receiver) {}
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+
+      @GetRestrictionCoder
+      public SomeRestrictionCoder getRestrictionCoder() {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "Non-splittable, but defines methods: "
+            + "[@GetInitialRestriction, @SplitRestriction, @NewTracker, 
@GetRestrictionCoder]");
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+  }
+
+  @Test
+  public void testNewTrackerWrongNumArguments() throws Exception {
+    thrown.expectMessage("Must have a single argument");
+    DoFnSignatures.analyzeNewTrackerMethod(
+        errors(),
+        TypeToken.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          private SomeRestrictionTracker method(SomeRestriction restriction, 
Object extra) {
+            return null;
+          }
+        }.getMethod());
+  }
+
+  @Test
+  public void testNewTrackerInconsistent() throws Exception {
+    thrown.expectMessage(
+        "Returns SomeRestrictionTracker, but must return a subtype of 
RestrictionTracker<String>");
+    DoFnSignatures.analyzeNewTrackerMethod(
+        errors(),
+        TypeToken.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          private SomeRestrictionTracker method(String restriction) {
+            return null;
+          }
+        }.getMethod());
+  }
+}


Reply via email to