Repository: incubator-beam Updated Branches: refs/heads/master 0866e4906 -> 2cbac53c5
Create DoFnInvoker instances in the package of the invoked DoFn The DoFnInvoker implementation for a DoFn carries a field with the type of the invoked DoFn, and that type needs to be visible. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/681ad896 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/681ad896 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/681ad896 Branch: refs/heads/master Commit: 681ad8968ef68a309e118ae0d87635535ec26b3f Parents: e8695a1 Author: Kenneth Knowles <k...@google.com> Authored: Tue Jul 26 21:40:02 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Jul 27 14:00:05 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/transforms/DoFnReflector.java | 20 +++- .../beam/sdk/transforms/DoFnReflectorTest.java | 75 +++++++++++- .../dofnreflector/DoFnReflectorTestHelper.java | 116 +++++++++++++++++++ 3 files changed, 203 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/681ad896/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java index 116b64d..0616eff 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java @@ -42,16 +42,18 @@ import com.google.common.reflect.TypeParameter; import com.google.common.reflect.TypeToken; import net.bytebuddy.ByteBuddy; +import net.bytebuddy.NamingStrategy.SuffixingRandom; import net.bytebuddy.description.field.FieldDescription; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.method.ParameterList; import net.bytebuddy.description.modifier.FieldManifestation; import net.bytebuddy.description.modifier.Visibility; import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.description.type.TypeDescription.Generic; import net.bytebuddy.dynamic.DynamicType; import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.dynamic.scaffold.InstrumentedType; -import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; +import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy.Default; import net.bytebuddy.implementation.Implementation; import net.bytebuddy.implementation.MethodCall.MethodLocator; import net.bytebuddy.implementation.StubMethod; @@ -70,7 +72,6 @@ import net.bytebuddy.jar.asm.Label; import net.bytebuddy.jar.asm.MethodVisitor; import net.bytebuddy.jar.asm.Opcodes; import net.bytebuddy.matcher.ElementMatchers; - import org.joda.time.Instant; import java.io.IOException; @@ -89,7 +90,6 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; - import javax.annotation.Nullable; @@ -499,8 +499,20 @@ public abstract class DoFnReflector { */ private Constructor<? extends DoFnInvoker<?, ?>> createInvokerConstructor( @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> clazz) { + + final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(clazz); + DynamicType.Builder<?> builder = new ByteBuddy() - .subclass(DoFnInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS) + // Create subclasses inside the target class, to have access to + // private and package-private bits + .with(new SuffixingRandom("auxiliary") { + @Override + public String subclass(Generic superClass) { + return super.name(clazzDescription); + } + }) + // Create a subclass of DoFnInvoker + .subclass(DoFnInvoker.class, Default.NO_CONSTRUCTORS) .defineField(FN_DELEGATE_FIELD_NAME, clazz, Visibility.PRIVATE, FieldManifestation.FINAL) // Define a constructor to populate fields appropriately. .defineConstructor(Visibility.PUBLIC) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/681ad896/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java index cf9f8e8..3238f2c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.DoFnWithContext.Context; import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory; import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessContext; import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessElement; +import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowingInternals; @@ -46,10 +47,13 @@ import java.lang.reflect.Method; @RunWith(JUnit4.class) public class DoFnReflectorTest { - private static class Invocations { - private boolean wasProcessElementInvoked = false; - private boolean wasStartBundleInvoked = false; - private boolean wasFinishBundleInvoked = false; + /** + * A convenience struct holding flags that indicate whether a particular method was invoked. + */ + public static class Invocations { + public boolean wasProcessElementInvoked = false; + public boolean wasStartBundleInvoked = false; + public boolean wasFinishBundleInvoked = false; private final String name; public Invocations(String name) { @@ -357,6 +361,69 @@ public class DoFnReflectorTest { }); } + private static class PrivateDoFnClass extends DoFnWithContext<String, String> { + final Invocations invocations = new Invocations(getClass().getName()); + + @ProcessElement + public void processThis(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + } + + @Test + public void testLocalPrivateDoFnClass() throws Exception { + PrivateDoFnClass fn = new PrivateDoFnClass(); + DoFnReflector reflector = underTest(fn); + checkInvokeProcessElementWorks(reflector, fn.invocations); + } + + @Test + public void testStaticPackagePrivateDoFnClass() throws Exception { + Invocations invocations = new Invocations("StaticPackagePrivateDoFn"); + DoFnReflector reflector = + underTest(DoFnReflectorTestHelper.newStaticPackagePrivateDoFn(invocations)); + checkInvokeProcessElementWorks(reflector, invocations); + } + + @Test + public void testInnerPackagePrivateDoFnClass() throws Exception { + Invocations invocations = new Invocations("InnerPackagePrivateDoFn"); + DoFnReflector reflector = + underTest(new DoFnReflectorTestHelper().newInnerPackagePrivateDoFn(invocations)); + checkInvokeProcessElementWorks(reflector, invocations); + } + + @Test + public void testStaticPrivateDoFnClass() throws Exception { + Invocations invocations = new Invocations("StaticPrivateDoFn"); + DoFnReflector reflector = underTest(DoFnReflectorTestHelper.newStaticPrivateDoFn(invocations)); + checkInvokeProcessElementWorks(reflector, invocations); + } + + @Test + public void testInnerPrivateDoFnClass() throws Exception { + Invocations invocations = new Invocations("StaticInnerDoFn"); + DoFnReflector reflector = + underTest(new DoFnReflectorTestHelper().newInnerPrivateDoFn(invocations)); + checkInvokeProcessElementWorks(reflector, invocations); + } + + @Test + public void testAnonymousInnerDoFnInOtherPackage() throws Exception { + Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage"); + DoFnReflector reflector = + underTest(new DoFnReflectorTestHelper().newInnerAnonymousDoFn(invocations)); + checkInvokeProcessElementWorks(reflector, invocations); + } + + @Test + public void testStaticAnonymousDoFnInOtherPackage() throws Exception { + Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage"); + DoFnReflector reflector = + underTest(DoFnReflectorTestHelper.newStaticAnonymousDoFn(invocations)); + checkInvokeProcessElementWorks(reflector, invocations); + } + @Test public void testPrivateProcessElement() throws Exception { thrown.expect(IllegalStateException.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/681ad896/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java new file mode 100644 index 0000000..5ff2bf1 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.dofnreflector; + +import org.apache.beam.sdk.transforms.DoFnReflectorTest.Invocations; +import org.apache.beam.sdk.transforms.DoFnWithContext; + +/** + * Test helper for DoFnReflectorTest, which needs to test package-private access + * to DoFns in other packages. + */ +public class DoFnReflectorTestHelper { + + private static class StaticPrivateDoFn extends DoFnWithContext<String, String> { + final Invocations invocations; + + public StaticPrivateDoFn(Invocations invocations) { + this.invocations = invocations; + } + + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + } + + private class InnerPrivateDoFn extends DoFnWithContext<String, String> { + final Invocations invocations; + + public InnerPrivateDoFn(Invocations invocations) { + this.invocations = invocations; + } + + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + } + + static class StaticPackagePrivateDoFn extends DoFnWithContext<String, String> { + final Invocations invocations; + + public StaticPackagePrivateDoFn(Invocations invocations) { + this.invocations = invocations; + } + + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + } + + class InnerPackagePrivateDoFn extends DoFnWithContext<String, String> { + final Invocations invocations; + + public InnerPackagePrivateDoFn(Invocations invocations) { + this.invocations = invocations; + } + + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + } + + public static DoFnWithContext<String, String> newStaticPackagePrivateDoFn( + Invocations invocations) { + return new StaticPackagePrivateDoFn(invocations); + } + + public DoFnWithContext<String, String> newInnerPackagePrivateDoFn(Invocations invocations) { + return new InnerPackagePrivateDoFn(invocations); + } + + public static DoFnWithContext<String, String> newStaticPrivateDoFn(Invocations invocations) { + return new StaticPrivateDoFn(invocations); + } + + public DoFnWithContext<String, String> newInnerPrivateDoFn(Invocations invocations) { + return new InnerPrivateDoFn(invocations); + } + + public DoFnWithContext<String, String> newInnerAnonymousDoFn(final Invocations invocations) { + return new DoFnWithContext<String, String>() { + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + }; + } + + public static DoFnWithContext<String, String> newStaticAnonymousDoFn( + final Invocations invocations) { + return new DoFnWithContext<String, String>() { + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + }; + } +}