Add DoFnInvoker dispatch for State and Timer parameters
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2db8268 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2db8268 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2db8268 Branch: refs/heads/gearpump-runner Commit: e2db82686008aea224ca5cf1ef1acc2831c46ceb Parents: c052d2a Author: Kenneth Knowles <k...@google.com> Authored: Thu Nov 3 19:18:24 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Nov 7 15:25:03 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunner.java | 12 +++ .../beam/runners/core/SplittableParDo.java | 12 +++ .../org/apache/beam/sdk/transforms/DoFn.java | 20 ++++ .../beam/sdk/transforms/DoFnAdapters.java | 22 ++++ .../sdk/transforms/reflect/DoFnInvokers.java | 104 +++++++++++-------- .../transforms/reflect/DoFnInvokersTest.java | 59 ++++++++++- 6 files changed, 187 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index dec9905..3abb06b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -48,11 +48,13 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -532,6 +534,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override + public State state(String timerId) { + throw new UnsupportedOperationException("State parameters are not supported."); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("Timer parameters are not supported."); + } + + @Override public WindowingInternals<InputT, OutputT> windowingInternals() { return new WindowingInternals<InputT, OutputT>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 33d0ab7..d8ee1d5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -46,9 +46,11 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; @@ -432,6 +434,16 @@ public class SplittableParDo< public TrackerT restrictionTracker() { return tracker; } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("State cannot be used with a splittable DoFn"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("Timers cannot be used with a splittable DoFn"); + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 2b3962e..876dfe2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -381,6 +381,16 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * the current {@link ProcessElement} call. */ <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker(); + + /** + * Returns the state cell for the given {@link StateId}. + */ + State state(String stateId); + + /** + * Returns the timer for the given {@link TimerId}. + */ + Timer timer(String timerId); } /** Receives values of the given type. */ @@ -416,6 +426,16 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD return null; } + @Override + public State state(String stateId) { + return null; + } + + @Override + public Timer timer(String timerId) { + return null; + } + public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index ca724cd..420304b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -28,7 +28,9 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; @@ -343,6 +345,16 @@ public class DoFnAdapters { public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { throw new UnsupportedOperationException("This is a non-splittable DoFn"); } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("State is not supported by this runner"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("Timers are not supported by this runner"); + } } /** @@ -436,5 +448,15 @@ public class DoFnAdapters { public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { throw new UnsupportedOperationException("This is a non-splittable DoFn"); } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("State is not supported by this runner"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("Timers are not supported by this runner"); + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index ad2b766..b7f75ed 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -23,8 +23,6 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -52,6 +50,7 @@ import net.bytebuddy.implementation.bytecode.StackManipulation; import net.bytebuddy.implementation.bytecode.Throw; import net.bytebuddy.implementation.bytecode.assign.Assigner; import net.bytebuddy.implementation.bytecode.assign.TypeCasting; +import net.bytebuddy.implementation.bytecode.constant.TextConstant; import net.bytebuddy.implementation.bytecode.member.FieldAccess; import net.bytebuddy.implementation.bytecode.member.MethodInvocation; import net.bytebuddy.implementation.bytecode.member.MethodReturn; @@ -77,6 +76,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Restrictio import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.TypeDescriptor; @@ -503,15 +503,15 @@ public class DoFnInvokers { } } - private static StackManipulation simpleExtraContextParameter( - String methodName, - StackManipulation pushExtraContextFactory) { + /** + * This wrapper exists to convert checked exceptions to unchecked exceptions, since if this fails + * the library itself is malformed. + */ + private static MethodDescription getExtraContextFactoryMethodDescription( + String methodName, Class<?>... parameterTypes) { try { - return new StackManipulation.Compound( - pushExtraContextFactory, - MethodInvocation.invoke( - new MethodDescription.ForLoadedMethod( - DoFn.ExtraContextFactory.class.getMethod(methodName)))); + return new MethodDescription.ForLoadedMethod( + DoFn.ExtraContextFactory.class.getMethod(methodName, parameterTypes)); } catch (Exception e) { throw new IllegalStateException( String.format( @@ -521,47 +521,69 @@ public class DoFnInvokers { } } + private static StackManipulation simpleExtraContextParameter( + String methodName, + StackManipulation pushExtraContextFactory) { + return new StackManipulation.Compound( + pushExtraContextFactory, + MethodInvocation.invoke(getExtraContextFactoryMethodDescription(methodName))); + } + private static StackManipulation getExtraContextParameter( DoFnSignature.Parameter parameter, final StackManipulation pushExtraContextFactory) { - return parameter.match(new Cases<StackManipulation>() { + return parameter.match( + new Cases<StackManipulation>() { - @Override - public StackManipulation dispatch(BoundedWindowParameter p) { - return simpleExtraContextParameter("window", pushExtraContextFactory); - } + @Override + public StackManipulation dispatch(BoundedWindowParameter p) { + return simpleExtraContextParameter("window", pushExtraContextFactory); + } - @Override - public StackManipulation dispatch(InputProviderParameter p) { - return simpleExtraContextParameter("inputProvider", pushExtraContextFactory); - } + @Override + public StackManipulation dispatch(InputProviderParameter p) { + return simpleExtraContextParameter("inputProvider", pushExtraContextFactory); + } - @Override - public StackManipulation dispatch(OutputReceiverParameter p) { - return simpleExtraContextParameter("outputReceiver", pushExtraContextFactory); - } + @Override + public StackManipulation dispatch(OutputReceiverParameter p) { + return simpleExtraContextParameter("outputReceiver", pushExtraContextFactory); + } - @Override - public StackManipulation dispatch(RestrictionTrackerParameter p) { - // ExtraContextFactory.restrictionTracker() returns a RestrictionTracker, - // but the @ProcessElement method expects a concrete subtype of it. - // Insert a downcast. - return new StackManipulation.Compound( - simpleExtraContextParameter("restrictionTracker", pushExtraContextFactory), - TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType()))); - } + @Override + public StackManipulation dispatch(RestrictionTrackerParameter p) { + // ExtraContextFactory.restrictionTracker() returns a RestrictionTracker, + // but the @ProcessElement method expects a concrete subtype of it. + // Insert a downcast. + return new StackManipulation.Compound( + simpleExtraContextParameter("restrictionTracker", pushExtraContextFactory), + TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType()))); + } - @Override - public StackManipulation dispatch(StateParameter p) { - throw new UnsupportedOperationException("State parameters are not yet supported."); - } + @Override + public StackManipulation dispatch(StateParameter p) { + return new StackManipulation.Compound( + // TOP = extraContextFactory.state(<id>) + pushExtraContextFactory, + new TextConstant(p.referent().id()), + MethodInvocation.invoke( + getExtraContextFactoryMethodDescription("state", String.class)), + TypeCasting.to( + new TypeDescription.ForLoadedType(p.referent().stateType().getRawType()))); + } - @Override - public StackManipulation dispatch(TimerParameter p) { - throw new UnsupportedOperationException("Timer parameters are not yet supported."); - } - }); + @Override + public StackManipulation dispatch(TimerParameter p) { + return new StackManipulation.Compound( + // TOP = extraContextFactory.state(<id>) + pushExtraContextFactory, + new TextConstant(p.referent().id()), + MethodInvocation.invoke( + getExtraContextFactoryMethodDescription("timer", String.class)), + TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class))); + } + }); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index dbb7955..60f82a8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -37,16 +37,23 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.GetInitialRestriction; import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; +import org.apache.beam.sdk.util.state.ValueState; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -173,6 +180,56 @@ public class DoFnInvokersTest { verify(fn).processElement(mockContext, mockWindow); } + /** + * Tests that the generated {@link DoFnInvoker} passes the state parameter that it + * should. + */ + @Test + public void testDoFnWithState() throws Exception { + ValueState<Integer> mockState = mock(ValueState.class); + final String stateId = "my-state-id-here"; + when(extraContextFactory.state(stateId)).thenReturn(mockState); + + class MockFn extends DoFn<String, String> { + @StateId(stateId) + private final StateSpec<Object, ValueState<Integer>> spec = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement(ProcessContext c, @StateId(stateId) ValueState<Integer> valueState) + throws Exception {} + } + MockFn fn = mock(MockFn.class); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + verify(fn).processElement(mockContext, mockState); + } + + /** + * Tests that the generated {@link DoFnInvoker} passes the timer parameter that it + * should. + */ + @Test + public void testDoFnWithTimer() throws Exception { + Timer mockTimer = mock(Timer.class); + final String timerId = "my-timer-id-here"; + when(extraContextFactory.timer(timerId)).thenReturn(mockTimer); + + class MockFn extends DoFn<String, String> { + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement(ProcessContext c, @TimerId(timerId) Timer timer) + throws Exception {} + + @OnTimer(timerId) + public void onTimer() {} + } + MockFn fn = mock(MockFn.class); + assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + verify(fn).processElement(mockContext, mockTimer); + } + @Test public void testDoFnWithOutputReceiver() throws Exception { class MockFn extends DoFn<String, String> {