Repository: incubator-beam Updated Branches: refs/heads/master a5d129361 -> 13b45895e
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index d057765..0bfe2be 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -17,14 +17,32 @@ */ package org.apache.beam.sdk.transforms.reflect; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; - +import static org.mockito.Mockito.when; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.GetInitialRestriction; +import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowingInternals; @@ -34,7 +52,9 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.AdditionalAnswers; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; /** Tests for {@link DoFnInvokers}. */ @@ -76,11 +96,16 @@ public class DoFnInvokersTest { public WindowingInternals<String, String> windowingInternals() { return mockWindowingInternals; } + + @Override + public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { + return null; + } }; } - private void invokeProcessElement(DoFn<String, String> fn) { - DoFnInvokers.INSTANCE + private ProcessContinuation invokeProcessElement(DoFn<String, String> fn) { + return DoFnInvokers.INSTANCE .newByteBuddyInvoker(fn) .invokeProcessElement(mockContext, extraContextFactory); } @@ -106,9 +131,9 @@ public class DoFnInvokersTest { @ProcessElement public void processElement(ProcessContext c) throws Exception {} } - MockFn fn = mock(MockFn.class); - invokeProcessElement(fn); - verify(fn).processElement(mockContext); + MockFn mockFn = mock(MockFn.class); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(mockFn)); + verify(mockFn).processElement(mockContext); } interface InterfaceWithProcessElement { @@ -128,7 +153,7 @@ public class DoFnInvokersTest { public void testDoFnWithProcessElementInterface() throws Exception { IdentityUsingInterfaceWithProcessElement fn = mock(IdentityUsingInterfaceWithProcessElement.class); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); verify(fn).processElement(mockContext); } @@ -149,14 +174,14 @@ public class DoFnInvokersTest { @Test public void testDoFnWithMethodInSuperclass() throws Exception { IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); verify(fn).process(mockContext); } @Test public void testDoFnWithMethodInSubclass() throws Exception { IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); verify(fn).process(mockContext); } @@ -167,7 +192,7 @@ public class DoFnInvokersTest { public void processElement(ProcessContext c, BoundedWindow w) throws Exception {} } MockFn fn = mock(MockFn.class); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); verify(fn).processElement(mockContext, mockWindow); } @@ -178,7 +203,7 @@ public class DoFnInvokersTest { public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {} } MockFn fn = mock(MockFn.class); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); verify(fn).processElement(mockContext, mockOutputReceiver); } @@ -189,11 +214,35 @@ public class DoFnInvokersTest { public void processElement(ProcessContext c, InputProvider<String> o) throws Exception {} } MockFn fn = mock(MockFn.class); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); verify(fn).processElement(mockContext, mockInputProvider); } @Test + public void testDoFnWithReturn() throws Exception { + class MockFn extends DoFn<String, String> { + @DoFn.ProcessElement + public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) + throws Exception { + return null; + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(String element) { + return null; + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + } + MockFn fn = mock(MockFn.class); + when(fn.processElement(mockContext, null)).thenReturn(ProcessContinuation.resume()); + assertEquals(ProcessContinuation.resume(), invokeProcessElement(fn)); + } + + @Test public void testDoFnWithStartBundleSetupTeardown() throws Exception { class MockFn extends DoFn<String, String> { @ProcessElement @@ -224,6 +273,154 @@ public class DoFnInvokersTest { } // --------------------------------------------------------------------------------------- + // Tests for invoking Splittable DoFn methods + // --------------------------------------------------------------------------------------- + private static class SomeRestriction {} + + private abstract static class SomeRestrictionTracker + implements RestrictionTracker<SomeRestriction> {} + + private static class SomeRestrictionCoder extends CustomCoder<SomeRestriction> { + public static SomeRestrictionCoder of() { + return new SomeRestrictionCoder(); + } + + @Override + public void encode(SomeRestriction value, OutputStream outStream, Context context) {} + + @Override + public SomeRestriction decode(InputStream inStream, Context context) { + return null; + } + } + + /** Public so Mockito can do "delegatesTo()" in the test below. */ + public static class MockFn extends DoFn<String, String> { + @ProcessElement + public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) { + return null; + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(String element) { + return null; + } + + @SplitRestriction + public void splitRestriction( + String element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {} + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + + @GetRestrictionCoder + public SomeRestrictionCoder getRestrictionCoder() { + return null; + } + } + + @Test + public void testSplittableDoFnWithAllMethods() throws Exception { + MockFn fn = mock(MockFn.class); + DoFnInvoker<String, String> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + final SomeRestrictionTracker tracker = mock(SomeRestrictionTracker.class); + final SomeRestrictionCoder coder = mock(SomeRestrictionCoder.class); + SomeRestriction restriction = new SomeRestriction(); + final SomeRestriction part1 = new SomeRestriction(); + final SomeRestriction part2 = new SomeRestriction(); + final SomeRestriction part3 = new SomeRestriction(); + when(fn.getRestrictionCoder()).thenReturn(coder); + when(fn.getInitialRestriction("blah")).thenReturn(restriction); + doAnswer( + AdditionalAnswers.delegatesTo( + new MockFn() { + @DoFn.SplitRestriction + @Override + public void splitRestriction( + String element, + SomeRestriction restriction, + DoFn.OutputReceiver<SomeRestriction> receiver) { + receiver.output(part1); + receiver.output(part2); + receiver.output(part3); + } + })) + .when(fn) + .splitRestriction( + eq("blah"), same(restriction), Mockito.<DoFn.OutputReceiver<SomeRestriction>>any()); + when(fn.newTracker(restriction)).thenReturn(tracker); + when(fn.processElement(mockContext, tracker)).thenReturn(ProcessContinuation.resume()); + + assertEquals(coder, invoker.invokeGetRestrictionCoder(new CoderRegistry())); + assertEquals(restriction, invoker.invokeGetInitialRestriction("blah")); + final List<SomeRestriction> outputs = new ArrayList<>(); + invoker.invokeSplitRestriction( + "blah", + restriction, + new DoFn.OutputReceiver<SomeRestriction>() { + @Override + public void output(SomeRestriction output) { + outputs.add(output); + } + }); + assertEquals(Arrays.asList(part1, part2, part3), outputs); + assertEquals(tracker, invoker.invokeNewTracker(restriction)); + assertEquals( + ProcessContinuation.resume(), + invoker.invokeProcessElement( + mockContext, + new DoFn.FakeExtraContextFactory<String, String>() { + @Override + public RestrictionTracker restrictionTracker() { + return tracker; + } + })); + } + + @Test + public void testSplittableDoFnDefaultMethods() throws Exception { + class MockFn extends DoFn<String, String> { + @ProcessElement + public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {} + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(String element) { + return null; + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + } + MockFn fn = mock(MockFn.class); + DoFnInvoker<String, String> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + + CoderRegistry coderRegistry = new CoderRegistry(); + coderRegistry.registerCoder(SomeRestriction.class, SomeRestrictionCoder.class); + assertThat( + invoker.<SomeRestriction>invokeGetRestrictionCoder(coderRegistry), + instanceOf(SomeRestrictionCoder.class)); + invoker.invokeSplitRestriction( + "blah", + "foo", + new DoFn.OutputReceiver<String>() { + private boolean invoked; + + @Override + public void output(String output) { + assertFalse(invoked); + invoked = true; + assertEquals("foo", output); + } + }); + assertEquals( + ProcessContinuation.stop(), invoker.invokeProcessElement(mockContext, extraContextFactory)); + } + + // --------------------------------------------------------------------------------------- // Tests for ability to invoke private, inner and anonymous classes. // --------------------------------------------------------------------------------------- @@ -235,14 +432,14 @@ public class DoFnInvokersTest { @Test public void testLocalPrivateDoFnClass() throws Exception { PrivateDoFnClass fn = mock(PrivateDoFnClass.class); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); verify(fn).processThis(mockContext); } @Test public void testStaticPackagePrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass()); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockContext); } @@ -250,28 +447,28 @@ public class DoFnInvokersTest { public void testInnerPackagePrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass()); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockContext); } @Test public void testStaticPrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass()); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockContext); } @Test public void testInnerPrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass()); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockContext); } @Test public void testAnonymousInnerDoFn() throws Exception { DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass()); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockContext); } @@ -279,7 +476,7 @@ public class DoFnInvokersTest { public void testStaticAnonymousDoFnInOtherPackage() throws Exception { // Can't use mockito for this one - the anonymous class is final and can't be mocked. DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFn(); - invokeProcessElement(fn); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); DoFnInvokersTestHelper.verifyStaticAnonymousDoFnInvoked(fn, mockContext); } @@ -303,6 +500,32 @@ public class DoFnInvokersTest { } @Test + public void testProcessElementExceptionWithReturn() throws Exception { + thrown.expect(UserCodeException.class); + thrown.expectMessage("bogus"); + DoFnInvokers.INSTANCE + .newByteBuddyInvoker( + new DoFn<Integer, Integer>() { + @ProcessElement + public ProcessContinuation processElement( + @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) { + throw new IllegalArgumentException("bogus"); + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + }) + .invokeProcessElement(null, new DoFn.FakeExtraContextFactory<Integer, Integer>()); + } + + @Test public void testStartBundleException() throws Exception { DoFnInvoker<Integer, Integer> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java index c269dbd..9cb1d23 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java @@ -61,7 +61,7 @@ public class DoFnSignaturesProcessElementTest { thrown.expect(IllegalArgumentException.class); thrown.expectMessage( "Integer is not a valid context parameter. " - + "Should be one of [BoundedWindow]"); + + "Should be one of [BoundedWindow, RestrictionTracker<?>]"); analyzeProcessElementMethod( new AnonymousMethod() { @@ -72,7 +72,7 @@ public class DoFnSignaturesProcessElementTest { @Test public void testBadReturnType() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Must return void"); + thrown.expectMessage("Must return void or ProcessContinuation"); analyzeProcessElementMethod( new AnonymousMethod() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java new file mode 100644 index 0000000..a9a7c81 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.analyzeProcessElementMethod; +import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.errors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.common.reflect.TypeToken; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod; +import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link DoFnSignatures} focused on methods related to <a + * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}. + */ +@SuppressWarnings("unused") +@RunWith(JUnit4.class) +public class DoFnSignaturesSplittableDoFnTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static class SomeRestriction {} + + private abstract static class SomeRestrictionTracker + implements RestrictionTracker<SomeRestriction> {} + + private abstract static class SomeRestrictionCoder implements Coder<SomeRestriction> {} + + @Test + public void testReturnsProcessContinuation() throws Exception { + DoFnSignature.ProcessElementMethod signature = + analyzeProcessElementMethod( + new AnonymousMethod() { + private DoFn.ProcessContinuation method( + DoFn<Integer, String>.ProcessContext context) { + return null; + } + }); + + assertTrue(signature.hasReturnValue()); + } + + @Test + public void testHasRestrictionTracker() throws Exception { + DoFnSignature.ProcessElementMethod signature = + analyzeProcessElementMethod( + new AnonymousMethod() { + private void method( + DoFn<Integer, String>.ProcessContext context, SomeRestrictionTracker tracker) {} + }); + + assertTrue(signature.isSplittable()); + assertTrue(signature.extraParameters().contains(DoFnSignature.Parameter.RESTRICTION_TRACKER)); + assertEquals(SomeRestrictionTracker.class, signature.trackerT().getRawType()); + } + + @Test + public void testSplittableProcessElementMustNotHaveOtherParams() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("must not have any extra context arguments"); + thrown.expectMessage("BOUNDED_WINDOW"); + + DoFnSignature.ProcessElementMethod signature = + analyzeProcessElementMethod( + new AnonymousMethod() { + private void method( + DoFn<Integer, String>.ProcessContext context, + SomeRestrictionTracker tracker, + BoundedWindow window) {} + }); + } + + @Test + public void testInfersBoundednessFromAnnotation() throws Exception { + class BaseSplittableFn extends DoFn<Integer, String> { + @ProcessElement + public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {} + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + } + + @BoundedPerElement + class BoundedSplittableFn extends BaseSplittableFn {} + + @UnboundedPerElement + class UnboundedSplittableFn extends BaseSplittableFn {} + + assertEquals( + PCollection.IsBounded.BOUNDED, + DoFnSignatures.INSTANCE + .getOrParseSignature(BaseSplittableFn.class) + .isBoundedPerElement()); + assertEquals( + PCollection.IsBounded.BOUNDED, + DoFnSignatures.INSTANCE + .getOrParseSignature(BoundedSplittableFn.class) + .isBoundedPerElement()); + assertEquals( + PCollection.IsBounded.UNBOUNDED, + DoFnSignatures.INSTANCE + .getOrParseSignature(UnboundedSplittableFn.class) + .isBoundedPerElement()); + } + + @Test + public void testUnsplittableIsBounded() throws Exception { + class UnsplittableFn extends DoFn<Integer, String> { + @ProcessElement + public void process(ProcessContext context) {} + } + assertEquals( + PCollection.IsBounded.BOUNDED, + DoFnSignatures.INSTANCE + .getOrParseSignature(UnsplittableFn.class) + .isBoundedPerElement()); + } + + private static class BaseFnWithContinuation extends DoFn<Integer, String> { + @ProcessElement + public ProcessContinuation processElement( + ProcessContext context, SomeRestrictionTracker tracker) { + return null; + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + } + + @Test + public void testSplittableIsBoundedByDefault() throws Exception { + assertEquals( + PCollection.IsBounded.UNBOUNDED, + DoFnSignatures.INSTANCE + .getOrParseSignature(BaseFnWithContinuation.class) + .isBoundedPerElement()); + } + + @Test + public void testSplittableRespectsBoundednessAnnotation() throws Exception { + @BoundedPerElement + class BoundedFnWithContinuation extends BaseFnWithContinuation {} + + assertEquals( + PCollection.IsBounded.BOUNDED, + DoFnSignatures.INSTANCE + .getOrParseSignature(BoundedFnWithContinuation.class) + .isBoundedPerElement()); + + @UnboundedPerElement + class UnboundedFnWithContinuation extends BaseFnWithContinuation {} + + assertEquals( + PCollection.IsBounded.UNBOUNDED, + DoFnSignatures.INSTANCE + .getOrParseSignature(UnboundedFnWithContinuation.class) + .isBoundedPerElement()); + } + + @Test + public void testUnsplittableButDeclaresBounded() throws Exception { + @BoundedPerElement + class SomeFn extends DoFn<Integer, String> { + @ProcessElement + public void process(ProcessContext context) {} + } + + thrown.expectMessage("Non-splittable, but annotated as @Bounded"); + DoFnSignatures.INSTANCE.getOrParseSignature(SomeFn.class); + } + + @Test + public void testUnsplittableButDeclaresUnbounded() throws Exception { + @UnboundedPerElement + class SomeFn extends DoFn<Integer, String> { + @ProcessElement + public void process(ProcessContext context) {} + } + + thrown.expectMessage("Non-splittable, but annotated as @Unbounded"); + DoFnSignatures.INSTANCE.getOrParseSignature(SomeFn.class); + } + + /** Tests a splittable {@link DoFn} that defines all methods in their full form, correctly. */ + @Test + public void testSplittableWithAllFunctions() throws Exception { + class GoodSplittableDoFn extends DoFn<Integer, String> { + @ProcessElement + public ProcessContinuation processElement( + ProcessContext context, SomeRestrictionTracker tracker) { + return null; + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + + @SplitRestriction + public void splitRestriction( + Integer element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {} + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + + @GetRestrictionCoder + public SomeRestrictionCoder getRestrictionCoder() { + return null; + } + } + + DoFnSignature signature = DoFnSignatures.INSTANCE.getOrParseSignature(GoodSplittableDoFn.class); + assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType()); + assertTrue(signature.processElement().isSplittable()); + assertTrue(signature.processElement().hasReturnValue()); + assertEquals( + SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType()); + assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType()); + assertEquals(SomeRestrictionTracker.class, signature.newTracker().trackerT().getRawType()); + assertEquals(SomeRestriction.class, signature.newTracker().restrictionT().getRawType()); + assertEquals(SomeRestrictionCoder.class, signature.getRestrictionCoder().coderT().getRawType()); + } + + /** + * Tests a splittable {@link DoFn} that defines all methods in their full form, correctly, using + * generic types. + */ + @Test + public void testSplittableWithAllFunctionsGeneric() throws Exception { + class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> { + @ProcessElement + public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) { + return null; + } + + @GetInitialRestriction + public RestrictionT getInitialRestriction(Integer element) { + return null; + } + + @SplitRestriction + public void splitRestriction( + Integer element, RestrictionT restriction, OutputReceiver<RestrictionT> receiver) {} + + @NewTracker + public TrackerT newTracker(RestrictionT restriction) { + return null; + } + + @GetRestrictionCoder + public CoderT getRestrictionCoder() { + return null; + } + } + + DoFnSignature signature = + DoFnSignatures.INSTANCE.getOrParseSignature( + new GoodGenericSplittableDoFn< + SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass()); + assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType()); + assertTrue(signature.processElement().isSplittable()); + assertTrue(signature.processElement().hasReturnValue()); + assertEquals( + SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType()); + assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType()); + assertEquals(SomeRestrictionTracker.class, signature.newTracker().trackerT().getRawType()); + assertEquals(SomeRestriction.class, signature.newTracker().restrictionT().getRawType()); + assertEquals(SomeRestrictionCoder.class, signature.getRestrictionCoder().coderT().getRawType()); + } + + @Test + public void testSplittableMissingRequiredMethods() throws Exception { + class BadFn extends DoFn<Integer, String> { + @ProcessElement + public void process(ProcessContext context, SomeRestrictionTracker tracker) {} + } + + thrown.expectMessage( + "Splittable, but does not define the following required methods: " + + "[@GetInitialRestriction, @NewTracker]"); + DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class); + } + + @Test + public void testNewTrackerReturnsWrongType() throws Exception { + class BadFn extends DoFn<Integer, String> { + @ProcessElement + public void process(ProcessContext context, SomeRestrictionTracker tracker) {} + + @NewTracker + public void newTracker(SomeRestriction restriction) {} + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + } + + thrown.expectMessage( + "Returns void, but must return a subtype of RestrictionTracker<SomeRestriction>"); + DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class); + } + + @Test + public void testGetInitialRestrictionMismatchesNewTracker() throws Exception { + class BadFn extends DoFn<Integer, String> { + @ProcessElement + public void process(ProcessContext context, SomeRestrictionTracker tracker) {} + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + + @GetInitialRestriction + public String getInitialRestriction(Integer element) { + return null; + } + } + + thrown.expectMessage( + "getInitialRestriction(Integer): Uses restriction type String, but @NewTracker method"); + thrown.expectMessage("newTracker(SomeRestriction) uses restriction type SomeRestriction"); + DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class); + } + + @Test + public void testGetRestrictionCoderReturnsWrongType() throws Exception { + class BadFn extends DoFn<Integer, String> { + @ProcessElement + public void process(ProcessContext context, SomeRestrictionTracker tracker) {} + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + + @GetRestrictionCoder + public KvCoder getRestrictionCoder() { + return null; + } + } + + thrown.expectMessage( + "getRestrictionCoder() returns KvCoder which is not a subtype of Coder<SomeRestriction>"); + DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class); + } + + @Test + public void testSplitRestrictionReturnsWrongType() throws Exception { + thrown.expectMessage( + "Third argument must be OutputReceiver<SomeRestriction>, but is OutputReceiver<String>"); + DoFnSignatures.analyzeSplitRestrictionMethod( + errors(), + TypeToken.of(FakeDoFn.class), + new AnonymousMethod() { + void method( + Integer element, SomeRestriction restriction, DoFn.OutputReceiver<String> receiver) {} + }.getMethod(), + TypeToken.of(Integer.class)); + } + + @Test + public void testSplitRestrictionWrongElementArgument() throws Exception { + class BadFn { + private List<SomeRestriction> splitRestriction(String element, SomeRestriction restriction) { + return null; + } + } + + thrown.expectMessage("First argument must be the element type Integer"); + DoFnSignatures.analyzeSplitRestrictionMethod( + errors(), + TypeToken.of(FakeDoFn.class), + new AnonymousMethod() { + void method( + String element, + SomeRestriction restriction, + DoFn.OutputReceiver<SomeRestriction> receiver) {} + }.getMethod(), + TypeToken.of(Integer.class)); + } + + @Test + public void testSplitRestrictionWrongNumArguments() throws Exception { + thrown.expectMessage("Must have exactly 3 arguments"); + DoFnSignatures.analyzeSplitRestrictionMethod( + errors(), + TypeToken.of(FakeDoFn.class), + new AnonymousMethod() { + private void method( + Integer element, + SomeRestriction restriction, + DoFn.OutputReceiver<SomeRestriction> receiver, + Object extra) {} + }.getMethod(), + TypeToken.of(Integer.class)); + } + + @Test + public void testSplitRestrictionConsistentButWrongType() throws Exception { + class OtherRestriction {} + + class BadFn extends DoFn<Integer, String> { + @ProcessElement + public void process(ProcessContext context, SomeRestrictionTracker tracker) {} + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + + @DoFn.SplitRestriction + public void splitRestriction( + Integer element, + OtherRestriction restriction, + OutputReceiver<OtherRestriction> receiver) {} + } + + thrown.expectMessage( + "getInitialRestriction(Integer): Uses restriction type SomeRestriction, " + + "but @SplitRestriction method "); + thrown.expectMessage( + "splitRestriction(Integer, OtherRestriction, OutputReceiver) " + + "uses restriction type OtherRestriction"); + DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class); + } + + @Test + public void testUnsplittableMustNotDefineExtraMethods() throws Exception { + class BadFn extends DoFn<Integer, String> { + @ProcessElement + public void processElement(ProcessContext context) {} + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + + @SplitRestriction + public void splitRestriction( + Integer element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {} + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + + @GetRestrictionCoder + public SomeRestrictionCoder getRestrictionCoder() { + return null; + } + } + + thrown.expectMessage( + "Non-splittable, but defines methods: " + + "[@GetInitialRestriction, @SplitRestriction, @NewTracker, @GetRestrictionCoder]"); + DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class); + } + + @Test + public void testNewTrackerWrongNumArguments() throws Exception { + thrown.expectMessage("Must have a single argument"); + DoFnSignatures.analyzeNewTrackerMethod( + errors(), + TypeToken.of(FakeDoFn.class), + new AnonymousMethod() { + private SomeRestrictionTracker method(SomeRestriction restriction, Object extra) { + return null; + } + }.getMethod()); + } + + @Test + public void testNewTrackerInconsistent() throws Exception { + thrown.expectMessage( + "Returns SomeRestrictionTracker, but must return a subtype of RestrictionTracker<String>"); + DoFnSignatures.analyzeNewTrackerMethod( + errors(), + TypeToken.of(FakeDoFn.class), + new AnonymousMethod() { + private SomeRestrictionTracker method(String restriction) { + return null; + } + }.getMethod()); + } +}