Condense FunctionSpec and remove SdkFunctionSpec, merging data and params

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3120fd9a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3120fd9a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3120fd9a

Branch: refs/heads/master
Commit: 3120fd9a499b01b08659edc4e045f5abbcc3ae07
Parents: 672d12a
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Feb 23 18:06:56 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Fri Feb 24 14:38:28 2017 -0800

----------------------------------------------------------------------
 .../src/main/proto/beam_runner_api.proto        | 114 ++++++++++---------
 .../beam/sdk/util/WindowingStrategies.java      | 110 ++++++++++++------
 2 files changed, 134 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3120fd9a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 32c53fb..989e4bb 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -74,7 +74,6 @@ message MessageWithComponents {
     PTransform ptransform = 7;
     PCollection pcollection = 8;
     ReadPayload read_payload = 9;
-    SdkFunctionSpec sdk_function_spec = 10;
     SideInput side_input = 11;
     WindowIntoPayload window_into_payload = 12;
     WindowingStrategy windowing_strategy = 13;
@@ -100,7 +99,8 @@ message Pipeline {
   // (Required) The id of the PTransform that is the root of the pipeline
   string root_transform_id = 2;
 
-  // (Required) Static display data for the pipeline.
+  // (Optional) Static display data for the pipeline. If there is none,
+  // it may be omitted.
   DisplayData display_data = 3;
 }
 
@@ -116,7 +116,8 @@ message PTransform {
   // etc.
   //
   // If it is not stable, then the runner decides what will happen. But, most
-  // importantly, it must always be here, even if it is autogenerated.
+  // importantly, it must always be here and be unique, even if it is
+  // autogenerated.
   string unique_name = 5;
 
   // (Optional) A URN and payload that, together, fully defined the semantics
@@ -125,13 +126,11 @@ message PTransform {
   // If absent, this must be an "anonymous" composite transform.
   //
   // For primitive transform in the Runner API, this is required, and the
-  // payloads are as follows:
+  // payloads are well-defined messages. When the URN indicates ParDo it
+  // is a ParDoPayload, and so on.
   //
-  //  - when the URN is "urn:beam:transforms:pardo" it is a ParDoPayload
-  //  - when the URN is "urn:beam:transforms:read" it is a ReadPayload
-  //  - when the URN is "urn:beam:transforms:gbk" it is a GroupByKeyPayload
-  //  - when the URN is "urn:beam:transforms:window" it is a WindowPayload
-  //  - when the URN is "urn:beam:transforms:flatten" it is absent
+  // TODO: document the standardized URNs and payloads
+  // TODO: separate standardized payloads into a separate proto file
   //
   // For some special composite transforms, the payload is also officially
   // defined:
@@ -144,7 +143,9 @@ message PTransform {
   // transforms that it contains.
   repeated string subtransforms = 2;
 
-  // (Required) A map from local names of inputs to PCollection ids.
+  // (Required) A map from local names of inputs (unique only with this map, 
and
+  // likely embedded in the transform payload and serialized user code) to
+  // PCollection ids.
   //
   // The payload for this transform may clarify the relationship of these
   // inputs. For example:
@@ -157,7 +158,9 @@ message PTransform {
   //
   map<string, string> inputs = 3;
 
-  // (Required) A map from local names of outputs to PCollection ids.
+  // (Required) A map from local names of outputs (unique only within this map,
+  // and likely embedded in the transform payload and serialized user code)
+  // to PCollection ids.
   //
   // The URN or payload for this transform node may clarify the type and
   // relationship of these outputs. For example:
@@ -167,7 +170,9 @@ message PTransform {
   //
   map<string, string> outputs = 4;
 
-  // (Required) Static display data for this PTransform application.
+  // (Optional) Static display data for this PTransform application. If
+  // there is none, or it is not relevant (such as use by the Fn API)
+  // then it may be omitted.
   DisplayData display_data = 6;
 }
 
@@ -193,7 +198,9 @@ message PCollection {
   // (Required) The id of the windowing strategy for this PCollection.
   string windowing_strategy_id = 4;
 
-  // (Required) Static display data for this PCollection.
+  // (Optional) Static display data for this PTransform application. If
+  // there is none, or it is not relevant (such as use by the Fn API)
+  // then it may be omitted.
   DisplayData display_data = 5;
 }
 
@@ -301,18 +308,17 @@ message CombinePayload {
 // a pipeline.
 message Coder {
 
-  // (Required) A cross-language, stable, unique identifier for the (possibly
-  // parametric) encoding.
-  string urn = 1;
+  // (Required) A specification for the coder, as a URN plus parameters. This
+  // may be a cross-language agreed-upon format, or it may be a "custom coder"
+  // that can only be used by a particular SDK. It does not include component
+  // coders, as it is beneficial for these to be comprehensible to a runner
+  // regardless of whether the binary format is agree-upon.
+  FunctionSpec spec = 1;
 
   // (Optional) If this coder is parametric, such as ListCoder(VarIntCoder),
   // this is a list of the components. In order for encodings to be identical,
-  // the URN and all components must be identical, recursively.
-  repeated string component_coder_id = 2;
-
-  // (Optional) The pipeline-scoped id for the FunctionSpec of an SDK-specific
-  // UDF implementing the encoding.
-  string custom_coder_fn_id = 3;
+  // the FunctionSpec and all components must be identical, recursively.
+  repeated string component_coder_ids = 2;
 }
 
 // A windowing strategy describes the window function, triggering, allowed
@@ -615,37 +621,51 @@ message Environment {
   string url = 1;
 }
 
-// Description of a function in a Beam pipeline.
+// A specification of a user defined function.
 //
-// Contains one of _or both of_ a UrnWithParameter specifying the function
-// and the specification for how to execute it against a particular
-// SDK's harness.
 message FunctionSpec {
 
-  // (Optional) An SDK-independent specification of this function.
-  // If present, this must _fully_ specify the function.
-  //
-  // For example the distinguished urn "urn:beam:windowfn:FixedWindows" with
-  // payload `{ duration: n }` fully specifies a windowing function which can
-  // be implemented by the SDK constructing the pipeline, by another SDK (for
-  // language-to-language fusion compatibility) or by the runner directly.
+  // (Required) A full specification of this function.
   UrnWithParameter spec = 1;
 
-  // (Optional) An SDK-specific specification for how to execute this function,
-  // including a specification of the environment in which the function
-  // can be interpreted and executed.
-  SdkFunctionSpec sdk_fn_spec = 2;
+  // (Required) Reference to an execution environment capable of
+  // invoking this function.
+  string environment_id = 2;
 }
 
 // A URN along with a parameter object whose schema is determined by the
 // URN.
 //
-// The URN will often specify a parametric function or transform such as
-// "Top" or "FixedWindows" while the payload would specify _n_ or
-// _duration_, respectively.
+// This structure is reused in two distinct, but compatible, ways:
+//
+// 1. This can be a specification of the function over PCollections
+//    that a PTransform computes.
+// 2. This can be a specification of a user-defined function, possibly
+//    SDK-specific. (external to this message must be adequate context
+//    to indicate the environment in which the UDF can be understood).
+//
+// Though not explicit in this proto, there are two possibilities
+// for the relationship of a runner to this specification that
+// one should bear in mind:
+//
+// 1. The runner understands the URN. For example, it might be
+//    a well-known URN like "urn:beam:transform:Top" or
+//    "urn:beam:windowfn:FixedWindows" with
+//    an agreed-upon payload (e.g. a number or duration,
+//    respectively).
+// 2. The runner does not understand the URN. It might be an
+//    SDK specific URN such as "urn:beam:dofn:javasdk:1.0"
+//    that indicates to the SDK what the payload is,
+//    such as a serialized Java DoFn from a particular
+//    version of the Beam Java SDK. The payload will often
+//    then be an opaque message such as bytes in a
+//    language-specific serialization format.
 message UrnWithParameter {
 
   // (Required) A URN that describes the accompanying payload.
+  // For any URN that is not recognized (by whomever is inspecting
+  // it) the parameter payload should be treated as opaque and
+  // passed as-is.
   string urn = 1;
 
   // (Optional) The data specifying any parameters to the URN. If
@@ -653,20 +673,6 @@ message UrnWithParameter {
   google.protobuf.Any parameter = 2;
 }
 
-// An arbitrary payload tagged with the environment that knows how to
-// interpret it as a user-defined function.
-message SdkFunctionSpec {
-
-  // (Required) Reference to the specification of the execution environment
-  // required to invoke this function.
-  string environment_id = 2;
-
-  // (Required) The raw data of the function that the SDK knows how to
-  // deserialize, but need not be comprehensible to any other runner, SDK, or
-  // other entity.
-  bytes data = 4;
-}
-
 // TODO: transfer javadoc here
 message DisplayData {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3120fd9a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
index 1af7719..3047da1 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
@@ -17,14 +17,21 @@
  */
 package org.apache.beam.sdk.util;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.UUID;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.UrnWithParameter;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
@@ -108,7 +115,6 @@ public class WindowingStrategies implements Serializable {
                 RunnerApi.ClosingBehavior.class.getCanonicalName(),
                 ClosingBehavior.class.getCanonicalName(),
                 proto));
-
     }
   }
 
@@ -122,46 +128,70 @@ public class WindowingStrategies implements Serializable {
   }
 
   // This URN says that the coder is just a UDF blob the indicated SDK 
understands
-  private static final String CUSTOM_CODER_URN = "urn:beam:coders:custom:1.0";
+  // TODO: standardize such things
+  private static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
+
+  // This URN says that the WindowFn is just a UDF blob the indicated SDK 
understands
+  // TODO: standardize such things
+  private static final String CUSTOM_WINDOWFN_URN = 
"urn:beam:windowfn:javasdk:0.1";
 
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
+  /**
+   * Converts a {@link WindowFn} into a {@link 
RunnerApi.MessageWithComponents} where
+   * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link 
RunnerApi.FunctionSpec}
+   * for the input {@link WindowFn}.
+   */
   public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> 
windowFn)
       throws IOException {
     Coder<?> windowCoder = windowFn.windowCoder();
 
     // TODO: re-use components
     String windowCoderId = UUID.randomUUID().toString();
-    String customCoderId = UUID.randomUUID().toString();
 
-    return RunnerApi.MessageWithComponents.newBuilder()
-        .setFunctionSpec(
-            RunnerApi.FunctionSpec.newBuilder()
-                .setSdkFnSpec(
-                    RunnerApi.SdkFunctionSpec.newBuilder()
-                        .setData(
-                            
ByteString.copyFrom(SerializableUtils.serializeToByteArray(windowFn)))))
-        .setComponents(
-            Components.newBuilder()
-                .putCoders(
-                    windowCoderId,
-                    RunnerApi.Coder.newBuilder()
-                        .setUrn(CUSTOM_CODER_URN)
-                        .setCustomCoderFnId(customCoderId)
-                        .build())
-                .putFunctionSpecs(
-                    customCoderId,
-                    RunnerApi.FunctionSpec.newBuilder()
-                        .setSdkFnSpec(
-                            RunnerApi.SdkFunctionSpec.newBuilder()
-                                .setData(
+    RunnerApi.FunctionSpec windowFnSpec =
+        RunnerApi.FunctionSpec.newBuilder()
+            .setSpec(
+                UrnWithParameter.newBuilder()
+                    .setUrn(CUSTOM_WINDOWFN_URN)
+                    .setParameter(
+                        Any.pack(
+                            BytesValue.newBuilder()
+                                .setValue(
                                     ByteString.copyFrom(
-                                        OBJECT_MAPPER.writeValueAsBytes(
-                                            windowCoder.asCloudObject()))))
-                        .build()))
+                                        
SerializableUtils.serializeToByteArray(windowFn)))
+                                .build())))
+            .build();
+
+    RunnerApi.Coder windowCoderProto =
+        RunnerApi.Coder.newBuilder()
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setSpec(
+                        UrnWithParameter.newBuilder()
+                            .setUrn(CUSTOM_CODER_URN)
+                            .setParameter(
+                                Any.pack(
+                                    BytesValue.newBuilder()
+                                        .setValue(
+                                            ByteString.copyFrom(
+                                                
OBJECT_MAPPER.writeValueAsBytes(
+                                                    
windowCoder.asCloudObject())))
+                                        .build()))))
+            .build();
+
+    return RunnerApi.MessageWithComponents.newBuilder()
+        .setFunctionSpec(windowFnSpec)
+        .setComponents(Components.newBuilder().putCoders(windowCoderId, 
windowCoderProto))
         .build();
   }
 
+  /**
+   * Converts a {@link WindowingStrategy} into a {@link 
RunnerApi.MessageWithComponents} where
+   * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a 
{@link
+   * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the 
input {@link
+   * WindowingStrategy}.
+   */
   public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, 
?> windowingStrategy)
       throws IOException {
 
@@ -195,7 +225,8 @@ public class WindowingStrategies implements Serializable {
    * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link 
RunnerApi.Components}
    * to the SDK's {@link WindowingStrategy}.
    */
-  public static WindowingStrategy<?, ?> 
fromProto(RunnerApi.MessageWithComponents proto) {
+  public static WindowingStrategy<?, ?> 
fromProto(RunnerApi.MessageWithComponents proto)
+      throws InvalidProtocolBufferException {
     switch (proto.getRootCase()) {
       case WINDOWING_STRATEGY:
         return fromProto(proto.getWindowingStrategy(), proto.getComponents());
@@ -212,15 +243,23 @@ public class WindowingStrategies implements Serializable {
    * the provided components to dereferences identifiers found in the proto.
    */
   public static WindowingStrategy<?, ?> fromProto(
-      RunnerApi.WindowingStrategy proto, RunnerApi.Components components) {
+      RunnerApi.WindowingStrategy proto, RunnerApi.Components components)
+      throws InvalidProtocolBufferException {
+
+    FunctionSpec windowFnSpec =
+        components
+            .getFunctionSpecsMap()
+            .get(proto.getFnId());
+
+    checkArgument(
+        windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
+        "Only Java-serialized %s instances are supported, with URN %s. But 
found URN %s",
+        WindowFn.class.getSimpleName(),
+        windowFnSpec.getSpec().getUrn());
+
     Object deserializedWindowFn =
         SerializableUtils.deserializeFromByteArray(
-            components
-                .getFunctionSpecsMap()
-                .get(proto.getFnId())
-                .getSdkFnSpec()
-                .getData()
-                .toByteArray(),
+            
windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
             "WindowFn");
 
     WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
@@ -237,5 +276,4 @@ public class WindowingStrategies implements Serializable {
         .withOutputTimeFn(outputTimeFn)
         .withClosingBehavior(closingBehavior);
   }
-
 }

Reply via email to