Repository: beam
Updated Branches:
  refs/heads/master ed7b82e7e -> 144bffd40


Add wrapping of lambda in a SimpleFunction


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23152178
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23152178
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23152178

Branch: refs/heads/master
Commit: 23152178f81e635db65a7aae71f47fa67b3dc065
Parents: ed7b82e
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Jan 26 11:19:42 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Feb 7 08:16:39 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/transforms/SimpleFunction.java     | 44 +++++++++++--
 .../beam/sdk/transforms/SimpleFunctionTest.java | 43 ++++++++++++
 .../sdk/transforms/MapElementsJava8Test.java    | 24 ++++++-
 .../sdk/transforms/SimpleFunctionJava8Test.java | 69 ++++++++++++++++++++
 4 files changed, 170 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/23152178/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
index 8604659..db44380 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import java.lang.reflect.Method;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -29,6 +31,40 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 public abstract class SimpleFunction<InputT, OutputT>
     implements SerializableFunction<InputT, OutputT>, HasDisplayData {
 
+  @Nullable
+  private final SerializableFunction<InputT, OutputT> fn;
+
+  protected SimpleFunction() {
+    this.fn = null;
+    // A subclass must override apply if using this constructor. Check that via
+    // reflection.
+    try {
+      Method methodThatMustBeOverridden =
+          SimpleFunction.class.getDeclaredMethod("apply", new Class[] 
{Object.class});
+      Method methodOnSubclass =
+          getClass().getMethod("apply", new Class[] {Object.class});
+
+      if (methodOnSubclass.equals(methodThatMustBeOverridden)) {
+        throw new IllegalStateException(
+            "Subclass of SimpleFunction must override 'apply' method"
+                + " or pass a SerializableFunction to the constructor,"
+                + " usually via a lambda or method reference.");
+      }
+
+    } catch (NoSuchMethodException exc) {
+      throw new RuntimeException("Impossible state: missing 'apply' method 
entirely", exc);
+    }
+  }
+
+  protected SimpleFunction(SerializableFunction<InputT, OutputT> fn) {
+    this.fn = fn;
+  }
+
+  @Override
+  public OutputT apply(InputT input) {
+    return fn.apply(input);
+  }
+
   public static <InputT, OutputT>
       SimpleFunction<InputT, OutputT> fromSerializableFunctionWithOutputType(
           SerializableFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> 
outputType) {
@@ -77,23 +113,17 @@ public abstract class SimpleFunction<InputT, OutputT>
   private static class SimpleFunctionWithOutputType<InputT, OutputT>
       extends SimpleFunction<InputT, OutputT> {
 
-    private final SerializableFunction<InputT, OutputT> fn;
     private final TypeDescriptor<OutputT> outputType;
 
     public SimpleFunctionWithOutputType(
         SerializableFunction<InputT, OutputT> fn,
         TypeDescriptor<OutputT> outputType) {
-      this.fn = fn;
+      super(fn);
       this.outputType = outputType;
     }
 
 
     @Override
-    public OutputT apply(InputT input) {
-      return fn.apply(input);
-    }
-
-    @Override
     public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
       return outputType;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/23152178/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java
new file mode 100644
index 0000000..bcfb558
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link SimpleFunction}.
+ */
+@RunWith(JUnit4.class)
+public class SimpleFunctionTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testFailureIfNotOverridden() {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("must override");
+    thrown.expectMessage("apply");
+
+    SimpleFunction<Integer, Integer> broken = new SimpleFunction<Integer, 
Integer>() {};
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/23152178/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
index ce0f111..7e63a7d 100644
--- 
a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
+++ 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
@@ -37,11 +37,11 @@ public class MapElementsJava8Test implements Serializable {
   public final transient TestPipeline pipeline = TestPipeline.create();
 
   /**
-   * Basic test of {@link MapElements} with a lambda (which is instantiated as 
a
-   * {@link SerializableFunction}).
+   * Basic test of {@link MapElements} with a lambda (which is instantiated as 
a {@link
+   * SerializableFunction}).
    */
   @Test
-  public void testMapBasic() throws Exception {
+  public void testMapLambda() throws Exception {
 
     PCollection<Integer> output = pipeline
         .apply(Create.of(1, 2, 3))
@@ -55,6 +55,24 @@ public class MapElementsJava8Test implements Serializable {
   }
 
   /**
+   * Basic test of {@link MapElements} with a lambda wrapped into a {@link 
SimpleFunction} to
+   * remember its type.
+   */
+  @Test
+  public void testMapWrappedLambda() throws Exception {
+
+    PCollection<Integer> output =
+        pipeline
+            .apply(Create.of(1, 2, 3))
+            .apply(
+                MapElements
+                    .via(new SimpleFunction<Integer, Integer>((Integer i) -> i 
* 2) {}));
+
+    PAssert.that(output).containsInAnyOrder(6, 2, 4);
+    pipeline.run();
+  }
+
+  /**
    * Basic test of {@link MapElements} with a method reference.
    */
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/23152178/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java
 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java
new file mode 100644
index 0000000..9beab34
--- /dev/null
+++ 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Java 8 tests for {@link SimpleFunction}.
+ */
+@RunWith(JUnit4.class)
+public class SimpleFunctionJava8Test implements Serializable {
+
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
+  /**
+   * Basic test of {@link MapElements} with a lambda (which is instantiated as 
a {@link
+   * SerializableFunction}).
+   */
+  @Test
+  public void testGoodTypeForLambda() throws Exception {
+    SimpleFunction<Integer, String> fn =
+        new SimpleFunction<Integer, String>((Integer i) -> i.toString()) {};
+
+    assertThat(fn.getInputTypeDescriptor(), 
equalTo(TypeDescriptors.integers()));
+    assertThat(fn.getOutputTypeDescriptor(), 
equalTo(TypeDescriptors.strings()));
+  }
+
+  /**
+   * Basic test of {@link MapElements} with a lambda wrapped into a {@link 
SimpleFunction} to
+   * remember its type.
+   */
+  @Test
+  public void testGoodTypeForMethodRef() throws Exception {
+    SimpleFunction<Integer, String> fn =
+        new SimpleFunction<Integer, 
String>(SimpleFunctionJava8Test::toStringThisThing) {};
+
+    assertThat(fn.getInputTypeDescriptor(), 
equalTo(TypeDescriptors.integers()));
+    assertThat(fn.getOutputTypeDescriptor(), 
equalTo(TypeDescriptors.strings()));
+  }
+
+  private static String toStringThisThing(Integer i) {
+    return i.toString();
+  }
+}

Reply via email to