Repository: beam Updated Branches: refs/heads/master f5714f220 -> 9ed2cf41f
Use bytes instead of Any in RunnerApi.FunctionSpec Keep a "any" field, renamed to any_param. Rename `parameter` to `payload` Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b9b0504 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b9b0504 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b9b0504 Branch: refs/heads/master Commit: 2b9b05049822a22154ac3c2f6b593061f42b54c1 Parents: f5714f2 Author: Thomas Groh <tg...@google.com> Authored: Mon Jun 5 11:22:56 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue Aug 8 18:18:59 2017 -0700 ---------------------------------------------------------------------- .../core/construction/CoderTranslation.java | 16 +--- .../core/construction/CombineTranslation.java | 28 ++---- .../CreatePCollectionViewTranslation.java | 17 +--- .../construction/PTransformTranslation.java | 12 +-- .../core/construction/ParDoTranslation.java | 58 +++++------- .../core/construction/PipelineTranslation.java | 12 +-- .../core/construction/ReadTranslation.java | 56 +++++------ .../construction/TestStreamTranslation.java | 5 +- .../construction/WindowIntoTranslation.java | 9 +- .../WindowingStrategyTranslation.java | 97 ++++++++------------ .../construction/WriteFilesTranslation.java | 29 +++--- .../CreatePCollectionViewTranslationTest.java | 6 +- .../core/construction/ParDoTranslationTest.java | 3 +- .../construction/TestStreamTranslationTest.java | 4 +- .../src/main/proto/beam_runner_api.proto | 5 +- .../beam/fn/harness/BeamFnDataReadRunner.java | 4 +- .../beam/fn/harness/BeamFnDataWriteRunner.java | 4 +- .../beam/fn/harness/BoundedSourceRunner.java | 10 +- .../apache/beam/fn/harness/FnApiDoFnRunner.java | 10 +- .../fn/harness/BeamFnDataReadRunnerTest.java | 5 +- .../fn/harness/BeamFnDataWriteRunnerTest.java | 5 +- .../fn/harness/BoundedSourceRunnerTest.java | 18 ++-- .../beam/fn/harness/FnApiDoFnRunnerTest.java | 13 +-- sdks/python/apache_beam/coders/coders.py | 11 +-- .../runners/portability/fn_api_runner.py | 78 ++++++++++------ .../runners/worker/bundle_processor.py | 21 ++--- sdks/python/apache_beam/transforms/core.py | 10 +- .../python/apache_beam/transforms/ptransform.py | 6 +- sdks/python/apache_beam/utils/proto_utils.py | 11 +++ sdks/python/apache_beam/utils/urns.py | 6 +- 30 files changed, 250 insertions(+), 319 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java index a6719ff..2246f81 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java @@ -24,9 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; @@ -138,13 +136,9 @@ public class CoderTranslation { .setSpec( FunctionSpec.newBuilder() .setUrn(JAVA_SERIALIZED_CODER_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(coder))) - .build())))) + .setPayload( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(coder))) + .build())) .build(); } @@ -182,9 +176,7 @@ public class CoderTranslation { protoCoder .getSpec() .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() + .getPayload() .toByteArray(), "Custom Coder Bytes"); } http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java index d909ccf..17c48dc 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java @@ -23,9 +23,7 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.CO import com.google.auto.service.AutoService; import com.google.common.collect.Iterables; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -79,7 +77,7 @@ public class CombineTranslation { CombinePayload payload = toProto(transform, components); return RunnerApi.FunctionSpec.newBuilder() .setUrn(COMBINE_TRANSFORM_URN) - .setParameter(Any.pack(payload)) + .setPayload(payload.toByteString()) .build(); } @@ -138,13 +136,9 @@ public class CombineTranslation { .setSpec( FunctionSpec.newBuilder() .setUrn(JAVA_SERIALIZED_COMBINE_FN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(combineFn))) - .build()))) + .setPayload( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn))) + .build()) .build(); } @@ -171,9 +165,7 @@ public class CombineTranslation { payload .getCombineFn() .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() + .getPayload() .toByteArray(), "CombineFn"); } @@ -190,10 +182,10 @@ public class CombineTranslation { private static CombinePayload getCombinePayload( AppliedPTransform<?, ?, ?> transform, SdkComponents components) throws IOException { - return PTransformTranslation.toProto( - transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), components) - .getSpec() - .getParameter() - .unpack(CombinePayload.class); + return CombinePayload.parseFrom( + PTransformTranslation.toProto( + transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), components) + .getSpec() + .getPayload()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java index c67d688..1027ea2 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java @@ -21,9 +21,7 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; import com.google.auto.service.AutoService; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.Collections; import java.util.Map; @@ -79,9 +77,7 @@ public class CreatePCollectionViewTranslation { SerializableUtils.deserializeFromByteArray( transformProto .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() + .getPayload() .toByteArray(), PCollectionView.class.getSimpleName()); } @@ -104,14 +100,9 @@ public class CreatePCollectionViewTranslation { SdkComponents components) { return FunctionSpec.newBuilder() .setUrn(getUrn(transform.getTransform())) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray( - transform.getTransform().getView()))) - .build())) + .setPayload( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(transform.getTransform().getView()))) .build(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index b8365c9..4bfe17a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -21,7 +21,7 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Any; +import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -131,9 +131,9 @@ public class PTransformTranslation { if (rawPTransform.getUrn() != null) { FunctionSpec.Builder payload = FunctionSpec.newBuilder().setUrn(rawPTransform.getUrn()); - @Nullable Any parameter = rawPTransform.getPayload(); + @Nullable ByteString parameter = rawPTransform.getPayload(); if (parameter != null) { - payload.setParameter(parameter); + payload.setPayload(parameter); } transformBuilder.setSpec(payload); } @@ -224,7 +224,7 @@ public class PTransformTranslation { public abstract String getUrn(); @Nullable - public Any getPayload() { + public ByteString getPayload() { return null; } @@ -254,9 +254,9 @@ public class PTransformTranslation { FunctionSpec.Builder transformSpec = FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())); - Any payload = transform.getTransform().getPayload(); + ByteString payload = transform.getTransform().getPayload(); if (payload != null) { - transformSpec.setParameter(payload); + transformSpec.setPayload(payload); } // Transforms like Combine may have Coders that need to be added but do not http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index 5765c51..6ae95e4 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -29,9 +29,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.io.Serializable; @@ -122,7 +120,7 @@ public class ParDoTranslation { ParDoPayload payload = toProto(transform.getTransform(), components); return RunnerApi.FunctionSpec.newBuilder() .setUrn(PAR_DO_TRANSFORM_URN) - .setParameter(Any.pack(payload)) + .setPayload(payload.toByteString()) .build(); } @@ -240,7 +238,7 @@ public class ParDoTranslation { RunnerApi.PTransform protoTransform = PTransformTranslation.toProto(application, SdkComponents.create()); - ParDoPayload payload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class); + ParDoPayload payload = ParDoPayload.parseFrom(protoTransform.getSpec().getPayload()); TupleTag<?> mainOutputTag = getMainOutputTag(payload); Set<String> outputTags = Sets.difference( @@ -259,7 +257,7 @@ public class ParDoTranslation { SdkComponents sdkComponents = SdkComponents.create(); RunnerApi.PTransform parDoProto = PTransformTranslation.toProto(application, sdkComponents); - ParDoPayload payload = parDoProto.getSpec().getParameter().unpack(ParDoPayload.class); + ParDoPayload payload = ParDoPayload.parseFrom(parDoProto.getSpec().getPayload()); List<PCollectionView<?>> views = new ArrayList<>(); RehydratedComponents components = @@ -289,7 +287,7 @@ public class ParDoTranslation { ptransform.getSpec().getUrn().equals(PAR_DO_TRANSFORM_URN), "Unexpected payload type %s", ptransform.getSpec().getUrn()); - ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class); + ParDoPayload payload = ParDoPayload.parseFrom(ptransform.getSpec().getPayload()); String mainInputId = Iterables.getOnlyElement( Sets.difference( @@ -377,7 +375,7 @@ public class ParDoTranslation { Combine.CombineFn<?, ?, ?> combineFn = (Combine.CombineFn<?, ?, ?>) SerializableUtils.deserializeFromByteArray( - combineFnSpec.getParameter().unpack(BytesValue.class).toByteArray(), + combineFnSpec.getPayload().toByteArray(), Combine.CombineFn.class.getSimpleName()); // Rawtype coder cast because it is required to be a valid accumulator coder @@ -443,14 +441,10 @@ public class ParDoTranslation { .setSpec( FunctionSpec.newBuilder() .setUrn(CUSTOM_JAVA_DO_FN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray( - DoFnAndMainOutput.of(fn, tag)))) - .build()))) + .setPayload( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(DoFnAndMainOutput.of(fn, tag)))) + .build()) .build(); } @@ -458,7 +452,7 @@ public class ParDoTranslation { throws InvalidProtocolBufferException { checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN)); byte[] serializedFn = - fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); + fnSpec.getSpec().getPayload().toByteArray(); return (DoFnAndMainOutput) SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag"); } @@ -542,22 +536,17 @@ public class ParDoTranslation { .setSpec( FunctionSpec.newBuilder() .setUrn(CUSTOM_JAVA_VIEW_FN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn))) - .build()))) + .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn))) + .build()) .build(); } private static <T> ParDoPayload getParDoPayload(AppliedPTransform<?, ?, ?> transform) throws IOException { - return PTransformTranslation.toProto( - transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create()) - .getSpec() - .getParameter() - .unpack(ParDoPayload.class); + RunnerApi.PTransform parDoPTransform = + PTransformTranslation.toProto( + transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create()); + return ParDoPayload.parseFrom(parDoPTransform.getSpec().getPayload()); } public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException { @@ -580,7 +569,7 @@ public class ParDoTranslation { spec.getUrn()); return (ViewFn<?, ?>) SerializableUtils.deserializeFromByteArray( - spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom ViewFn"); + spec.getPayload().toByteArray(), "Custom ViewFn"); } private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) { @@ -588,13 +577,9 @@ public class ParDoTranslation { .setSpec( FunctionSpec.newBuilder() .setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(windowMappingFn))) - .build()))) + .setPayload( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(windowMappingFn))) + .build()) .build(); } @@ -608,7 +593,6 @@ public class ParDoTranslation { spec.getUrn()); return (WindowMappingFn<?>) SerializableUtils.deserializeFromByteArray( - spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), - "Custom WinodwMappingFn"); + spec.getPayload().toByteArray(), "Custom WinodwMappingFn"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java index 9e4839a..d928338 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java @@ -24,7 +24,7 @@ import com.google.auto.value.AutoValue; import com.google.common.base.MoreObjects; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; -import com.google.protobuf.Any; +import com.google.protobuf.ByteString; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -158,7 +158,7 @@ public class PipelineTranslation { // TODO: ParDoTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674 if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) { RunnerApi.ParDoPayload payload = - transformSpec.getParameter().unpack(RunnerApi.ParDoPayload.class); + RunnerApi.ParDoPayload.parseFrom(transformSpec.getPayload()); List<PCollectionView<?>> views = new ArrayList<>(); for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry : @@ -182,7 +182,7 @@ public class PipelineTranslation { List<Coder<?>> additionalCoders = Collections.emptyList(); if (transformSpec.getUrn().equals(PTransformTranslation.COMBINE_TRANSFORM_URN)) { RunnerApi.CombinePayload payload = - transformSpec.getParameter().unpack(RunnerApi.CombinePayload.class); + RunnerApi.CombinePayload.parseFrom(transformSpec.getPayload()); additionalCoders = (List) Collections.singletonList( @@ -192,7 +192,7 @@ public class PipelineTranslation { RehydratedPTransform transform = RehydratedPTransform.of( transformSpec.getUrn(), - transformSpec.getParameter(), + transformSpec.getPayload(), additionalInputs, additionalCoders); @@ -233,7 +233,7 @@ public class PipelineTranslation { public abstract String getUrn(); @Nullable - public abstract Any getPayload(); + public abstract ByteString getPayload(); @Override public abstract Map<TupleTag<?>, PValue> getAdditionalInputs(); @@ -242,7 +242,7 @@ public class PipelineTranslation { public static RehydratedPTransform of( String urn, - Any payload, + ByteString payload, Map<TupleTag<?>, PValue> additionalInputs, List<Coder<?>> additionalCoders) { return new AutoValue_PipelineTranslation_RehydratedPTransform( http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java index 572384b..06d1074 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java @@ -22,9 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.util.Collections; @@ -83,12 +81,8 @@ public class ReadTranslation { .setSpec( FunctionSpec.newBuilder() .setUrn(JAVA_SERIALIZED_BOUNDED_SOURCE) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom(SerializableUtils.serializeToByteArray(source))) - .build()))) + .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(source))) + .build()) .build(); } @@ -99,9 +93,7 @@ public class ReadTranslation { payload .getSource() .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() + .getPayload() .toByteArray(), "BoundedSource"); } @@ -122,11 +114,13 @@ public class ReadTranslation { private static <T> ReadPayload getReadPayload( AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform) throws IOException { - return PTransformTranslation.toProto( - transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create()) - .getSpec() - .getParameter() - .unpack(ReadPayload.class); + return ReadPayload.parseFrom( + PTransformTranslation.toProto( + transform, + Collections.<AppliedPTransform<?, ?, ?>>emptyList(), + SdkComponents.create()) + .getSpec() + .getPayload()); } private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) { @@ -134,12 +128,8 @@ public class ReadTranslation { .setSpec( FunctionSpec.newBuilder() .setUrn(JAVA_SERIALIZED_UNBOUNDED_SOURCE) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom(SerializableUtils.serializeToByteArray(source))) - .build()))) + .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(source))) + .build()) .build(); } @@ -150,9 +140,7 @@ public class ReadTranslation { payload .getSource() .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() + .getPayload() .toByteArray(), "BoundedSource"); } @@ -160,13 +148,13 @@ public class ReadTranslation { public static PCollection.IsBounded sourceIsBounded(AppliedPTransform<?, ?, ?> transform) { try { return PCollectionTranslation.fromProto( - PTransformTranslation.toProto( - transform, - Collections.<AppliedPTransform<?, ?, ?>>emptyList(), - SdkComponents.create()) - .getSpec() - .getParameter() - .unpack(ReadPayload.class) + ReadPayload.parseFrom( + PTransformTranslation.toProto( + transform, + Collections.<AppliedPTransform<?, ?, ?>>emptyList(), + SdkComponents.create()) + .getSpec() + .getPayload()) .getIsBounded()); } catch (IOException e) { throw new RuntimeException("Internal error determining boundedness of Read", e); @@ -195,7 +183,7 @@ public class ReadTranslation { ReadPayload payload = toProto(transform.getTransform()); return RunnerApi.FunctionSpec.newBuilder() .setUrn(getUrn(transform.getTransform())) - .setParameter(Any.pack(payload)) + .setPayload(payload.toByteString()) .build(); } } @@ -222,7 +210,7 @@ public class ReadTranslation { ReadPayload payload = toProto(transform.getTransform()); return RunnerApi.FunctionSpec.newBuilder() .setUrn(getUrn(transform.getTransform())) - .setParameter(Any.pack(payload)) + .setPayload(payload.toByteString()) .build(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java index f23b2ec..cac7cdc 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN; import com.google.auto.service.AutoService; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.ArrayList; @@ -96,7 +95,7 @@ public class TestStreamTranslation { TestStream.class.getSimpleName(), transformProto.getSpec().getUrn()); RunnerApi.TestStreamPayload testStreamPayload = - transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class); + RunnerApi.TestStreamPayload.parseFrom(transformProto.getSpec().getPayload()); return (TestStream<T>) fromProto( @@ -185,7 +184,7 @@ public class TestStreamTranslation { throws IOException { return RunnerApi.FunctionSpec.newBuilder() .setUrn(getUrn(transform.getTransform())) - .setParameter(Any.pack(testStreamToPayload(transform.getTransform(), components))) + .setPayload(testStreamToPayload(transform.getTransform(), components).toByteString()) .build(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java index 6aec908..94ef22d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java @@ -21,7 +21,6 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; import com.google.auto.service.AutoService; -import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.util.Collections; @@ -54,8 +53,8 @@ public class WindowIntoTranslation { AppliedPTransform<?, ?, Window.Assign<?>> transform, SdkComponents components) { return FunctionSpec.newBuilder() .setUrn("urn:beam:transform:window:v1") - .setParameter( - Any.pack(WindowIntoTranslation.toProto(transform.getTransform(), components))) + .setPayload( + WindowIntoTranslation.toProto(transform.getTransform(), components).toByteString()) .build(); } } @@ -88,7 +87,7 @@ public class WindowIntoTranslation { WindowIntoPayload windowIntoPayload; try { - return transformProto.getSpec().getParameter().unpack(WindowIntoPayload.class); + return WindowIntoPayload.parseFrom(transformProto.getSpec().getPayload()); } catch (InvalidProtocolBufferException exc) { throw new IllegalStateException( String.format( @@ -128,7 +127,7 @@ public class WindowIntoTranslation { WindowIntoPayload payload = toProto(transform.getTransform(), components); return RunnerApi.FunctionSpec.newBuilder() .setUrn(getUrn(transform.getTransform())) - .setParameter(Any.pack(payload)) + .setPayload(payload.toByteString()) .build(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 565b552..ab50ea2 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -31,6 +31,9 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime; import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; import org.apache.beam.sdk.common.runner.v1.StandardWindowFns; +import org.apache.beam.sdk.common.runner.v1.StandardWindowFns.FixedWindowsPayload; +import org.apache.beam.sdk.common.runner.v1.StandardWindowFns.SessionsPayload; +import org.apache.beam.sdk.common.runner.v1.StandardWindowFns.SlidingWindowsPayload; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Sessions; @@ -199,77 +202,65 @@ public class WindowingStrategyTranslation implements Serializable { public static SdkFunctionSpec toProto( WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) { // TODO: Set environment IDs + ByteString serializedFn = ByteString.copyFrom(SerializableUtils.serializeToByteArray(windowFn)); if (USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN) { return SdkFunctionSpec.newBuilder() .setSpec( FunctionSpec.newBuilder() .setUrn(OLD_SERIALIZED_JAVA_WINDOWFN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(windowFn))) - .build()))) + .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build())) + .setPayload(serializedFn) + .build()) .build(); } else if (windowFn instanceof GlobalWindows) { return SdkFunctionSpec.newBuilder() .setSpec(FunctionSpec.newBuilder().setUrn(GLOBAL_WINDOWS_FN)) .build(); } else if (windowFn instanceof FixedWindows) { + FixedWindowsPayload fixedWindowsPayload = + FixedWindowsPayload.newBuilder() + .setSize(Durations.fromMillis(((FixedWindows) windowFn).getSize().getMillis())) + .setOffset(Timestamps.fromMillis(((FixedWindows) windowFn).getOffset().getMillis())) + .build(); return SdkFunctionSpec.newBuilder() .setSpec( FunctionSpec.newBuilder() .setUrn(FIXED_WINDOWS_FN) - .setParameter( - Any.pack( - StandardWindowFns.FixedWindowsPayload.newBuilder() - .setSize(Durations.fromMillis( - ((FixedWindows) windowFn).getSize().getMillis())) - .setOffset(Timestamps.fromMillis( - ((FixedWindows) windowFn).getOffset().getMillis())) - .build()))) + .setAnyParam(Any.pack(fixedWindowsPayload)) + .setPayload(fixedWindowsPayload.toByteString())) .build(); } else if (windowFn instanceof SlidingWindows) { + SlidingWindowsPayload slidingWindowsPayload = SlidingWindowsPayload.newBuilder() + .setSize(Durations.fromMillis(((SlidingWindows) windowFn).getSize().getMillis())) + .setOffset(Timestamps.fromMillis(((SlidingWindows) windowFn).getOffset().getMillis())) + .setPeriod(Durations.fromMillis(((SlidingWindows) windowFn).getPeriod().getMillis())) + .build(); return SdkFunctionSpec.newBuilder() .setSpec( FunctionSpec.newBuilder() .setUrn(SLIDING_WINDOWS_FN) - .setParameter( - Any.pack( - StandardWindowFns.SlidingWindowsPayload.newBuilder() - .setSize(Durations.fromMillis( - ((SlidingWindows) windowFn).getSize().getMillis())) - .setOffset(Timestamps.fromMillis( - ((SlidingWindows) windowFn).getOffset().getMillis())) - .setPeriod(Durations.fromMillis( - ((SlidingWindows) windowFn).getPeriod().getMillis())) - .build()))) + .setAnyParam(Any.pack(slidingWindowsPayload)) + .setPayload(slidingWindowsPayload.toByteString())) .build(); } else if (windowFn instanceof Sessions) { + SessionsPayload sessionsPayload = + SessionsPayload.newBuilder() + .setGapSize(Durations.fromMillis(((Sessions) windowFn).getGapDuration().getMillis())) + .build(); return SdkFunctionSpec.newBuilder() .setSpec( FunctionSpec.newBuilder() .setUrn(SESSION_WINDOWS_FN) - .setParameter( - Any.pack( - StandardWindowFns.SessionsPayload.newBuilder() - .setGapSize(Durations.fromMillis( - ((Sessions) windowFn).getGapDuration().getMillis())) - .build()))) + .setAnyParam(Any.pack(sessionsPayload)) + .setPayload(sessionsPayload.toByteString())) .build(); } else { return SdkFunctionSpec.newBuilder() .setSpec( FunctionSpec.newBuilder() .setUrn(SERIALIZED_JAVA_WINDOWFN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(windowFn))) - .build()))) + .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build())) + .setPayload(serializedFn)) .build(); } } @@ -365,49 +356,41 @@ public class WindowingStrategyTranslation implements Serializable { case GLOBAL_WINDOWS_FN: return new GlobalWindows(); case FIXED_WINDOWS_FN: - StandardWindowFns.FixedWindowsPayload fixedParams = - windowFnSpec - .getSpec() - .getParameter() - .unpack(StandardWindowFns.FixedWindowsPayload.class); + StandardWindowFns.FixedWindowsPayload fixedParams = null; + fixedParams = + StandardWindowFns.FixedWindowsPayload.parseFrom( + windowFnSpec.getSpec().getPayload()); return FixedWindows.of(Duration.millis(Durations.toMillis(fixedParams.getSize()))) .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset()))); case SLIDING_WINDOWS_FN: StandardWindowFns.SlidingWindowsPayload slidingParams = - windowFnSpec - .getSpec() - .getParameter() - .unpack(StandardWindowFns.SlidingWindowsPayload.class); + StandardWindowFns.SlidingWindowsPayload.parseFrom( + windowFnSpec.getSpec().getPayload()); return SlidingWindows.of(Duration.millis(Durations.toMillis(slidingParams.getSize()))) .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod()))) .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset()))); case SESSION_WINDOWS_FN: StandardWindowFns.SessionsPayload sessionParams = - windowFnSpec.getSpec().getParameter().unpack(StandardWindowFns.SessionsPayload.class); + StandardWindowFns.SessionsPayload.parseFrom(windowFnSpec.getSpec().getPayload()); return Sessions.withGapDuration( Duration.millis(Durations.toMillis(sessionParams.getGapSize()))); case SERIALIZED_JAVA_WINDOWFN_URN: case OLD_SERIALIZED_JAVA_WINDOWFN_URN: return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray( - windowFnSpec - .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() - .toByteArray(), - "WindowFn"); + windowFnSpec.getSpec().getPayload().toByteArray(), "WindowFn"); default: throw new IllegalArgumentException( "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn()); } - } catch (InvalidProtocolBufferException exc) { + } catch (InvalidProtocolBufferException e) { throw new IllegalArgumentException( String.format( "%s for %s with URN %s did not contain expected proto message for payload", FunctionSpec.class.getSimpleName(), WindowFn.class.getSimpleName(), - windowFnSpec.getSpec().getUrn())); + windowFnSpec.getSpec().getUrn()), + e); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java index 7954b0e..aeefd4f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java @@ -25,9 +25,7 @@ import com.google.auto.service.AutoService; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.io.Serializable; import java.util.Collections; @@ -83,13 +81,9 @@ public class WriteFilesTranslation { .setSpec( FunctionSpec.newBuilder() .setUrn(urn) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(serializable))) - .build()))) + .setPayload( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(serializable))) + .build()) .build(); } @@ -102,8 +96,7 @@ public class WriteFilesTranslation { FunctionSpec.class.getSimpleName(), sinkProto.getSpec().getUrn()); - byte[] serializedSink = - sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); + byte[] serializedSink = sinkProto.getSpec().getPayload().toByteArray(); return (FileBasedSink<?, ?, ?>) SerializableUtils.deserializeFromByteArray( @@ -163,11 +156,13 @@ public class WriteFilesTranslation { AppliedPTransform<PCollection<T>, PDone, ? extends PTransform<PCollection<T>, PDone>> transform) throws IOException { - return PTransformTranslation.toProto( - transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create()) - .getSpec() - .getParameter() - .unpack(WriteFilesPayload.class); + return WriteFilesPayload.parseFrom( + PTransformTranslation.toProto( + transform, + Collections.<AppliedPTransform<?, ?, ?>>emptyList(), + SdkComponents.create()) + .getSpec() + .getPayload()); } static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> { @@ -181,7 +176,7 @@ public class WriteFilesTranslation { AppliedPTransform<?, ?, WriteFiles<?, ?, ?>> transform, SdkComponents components) { return FunctionSpec.newBuilder() .setUrn(getUrn(transform.getTransform())) - .setParameter(Any.pack(toProto(transform.getTransform()))) + .setPayload(toProto(transform.getTransform()).toByteString()) .build(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java index 0d209a0..4f57af8 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java @@ -21,7 +21,6 @@ package org.apache.beam.runners.core.construction; import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; -import com.google.protobuf.BytesValue; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -98,8 +97,7 @@ public class CreatePCollectionViewTranslationTest { PCollectionView<?> deserializedView = (PCollectionView<?>) SerializableUtils.deserializeFromByteArray( - payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(), - PCollectionView.class.getSimpleName()); + payload.getPayload().toByteArray(), PCollectionView.class.getSimpleName()); assertThat( deserializedView, Matchers.<PCollectionView<?>>equalTo(createViewTransform.getView())); @@ -126,7 +124,7 @@ public class CreatePCollectionViewTranslationTest { PCollectionView<?> deserializedView = (PCollectionView<?>) SerializableUtils.deserializeFromByteArray( - payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(), + payload.getPayload().toByteArray(), PCollectionView.class.getSimpleName()); assertThat( http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java index c31e803..680f940 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java @@ -156,8 +156,7 @@ public class ParDoTranslationTest { // Decode Pipeline rehydratedPipeline = Pipeline.create(); - ParDoPayload parDoPayload = - protoTransform.getSpec().getParameter().unpack(ParDoPayload.class); + ParDoPayload parDoPayload = ParDoPayload.parseFrom(protoTransform.getSpec().getPayload()); for (PCollectionView<?> view : parDo.getSideInputs()) { SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId()); PCollectionView<?> restoredView = http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java index e4336df..893f4b9 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.TestStreamPayload; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; @@ -100,8 +101,7 @@ public class TestStreamTranslationTest { assertThat(spec.getUrn(), equalTo(TEST_STREAM_TRANSFORM_URN)); - RunnerApi.TestStreamPayload payload = - spec.getParameter().unpack(RunnerApi.TestStreamPayload.class); + RunnerApi.TestStreamPayload payload = TestStreamPayload.parseFrom(spec.getPayload()); verifyTestStreamEncoding( testStream, payload, RehydratedComponents.forComponents(components.toComponents())); http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 9afb565..fb5d47e 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -755,9 +755,12 @@ message FunctionSpec { // passed as-is. string urn = 1; + // (Deprecated) + google.protobuf.Any any_param = 2; + // (Optional) The data specifying any parameters to the URN. If // the URN does not require any arguments, this may be omitted. - google.protobuf.Any parameter = 2; + bytes payload = 3; } // TODO: transfer javadoc here http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java index 1e611db..df0e5a2 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java @@ -129,8 +129,8 @@ public class BeamFnDataReadRunner<OutputT> { BeamFnDataClient beamFnDataClientFactory, Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers) throws IOException { - this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class) - .getApiServiceDescriptor(); + this.apiServiceDescriptor = + BeamFnApi.RemoteGrpcPort.parseFrom(functionSpec.getPayload()).getApiServiceDescriptor(); this.inputTarget = inputTarget; this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier; this.beamFnDataClientFactory = beamFnDataClientFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java index bbed753..48b450a 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java @@ -120,8 +120,8 @@ public class BeamFnDataWriteRunner<InputT> { Map<String, RunnerApi.Coder> coders, BeamFnDataClient beamFnDataClientFactory) throws IOException { - this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class) - .getApiServiceDescriptor(); + this.apiServiceDescriptor = + BeamFnApi.RemoteGrpcPort.parseFrom(functionSpec.getPayload()).getApiServiceDescriptor(); this.beamFnDataClientFactory = beamFnDataClientFactory; this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier; this.outputTarget = outputTarget; http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java index 4702e05..5f6509f 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java @@ -22,7 +22,6 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; -import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.util.Collection; @@ -122,17 +121,14 @@ public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> public void start() throws Exception { try { // The representation here is defined as the java serialized representation of the - // bounded source object packed into a protobuf Any using a protobuf BytesValue wrapper. - byte[] bytes = definition.getParameter().unpack(BytesValue.class).getValue().toByteArray(); + // bounded source object in a ByteString wrapper. + byte[] bytes = definition.getPayload().toByteArray(); @SuppressWarnings("unchecked") InputT boundedSource = (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString()); runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource)); } catch (InvalidProtocolBufferException e) { - throw new IOException( - String.format("Failed to decode %s, expected %s", - definition.getParameter().getTypeUrl(), BytesValue.getDescriptor().getFullName()), - e); + throw new IOException(String.format("Failed to decode %s", definition.getUrn()), e); } } http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 97bd71c..86168f9 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -25,8 +25,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Multimap; import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; -import com.google.protobuf.InvalidProtocolBufferException; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; @@ -109,13 +107,7 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp outputMapBuilder.build(); // Get the DoFnInfo from the serialized blob. - ByteString serializedFn; - try { - serializedFn = pTransform.getSpec().getParameter().unpack(BytesValue.class).getValue(); - } catch (InvalidProtocolBufferException e) { - throw new IllegalArgumentException( - String.format("Unable to unwrap DoFn %s", pTransform.getSpec()), e); - } + ByteString serializedFn = pTransform.getSpec().getPayload(); @SuppressWarnings({"unchecked", "rawtypes"}) DoFnInfo<InputT, OutputT> doFnInfo = (DoFnInfo) SerializableUtils.deserializeFromByteArray( serializedFn.toByteArray(), "DoFnInfo"); http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java index d712f5f..92e6088 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java @@ -37,7 +37,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.Uninterruptibles; -import com.google.protobuf.Any; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -80,7 +79,7 @@ public class BeamFnDataReadRunnerTest { private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder() .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build(); private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder() - .setParameter(Any.pack(PORT_SPEC)).build(); + .setPayload(PORT_SPEC.toByteString()).build(); private static final Coder<WindowedValue<String>> CODER = WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); private static final String CODER_SPEC_ID = "string-coder-id"; @@ -131,7 +130,7 @@ public class BeamFnDataReadRunnerTest { RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder() .setUrn("urn:org.apache.beam:source:runner:0.1") - .setParameter(Any.pack(PORT_SPEC)) + .setPayload(PORT_SPEC.toByteString()) .build(); RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder() http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java index 0caf19e..ffa3a2d 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java @@ -37,7 +37,6 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; -import com.google.protobuf.Any; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -75,7 +74,7 @@ public class BeamFnDataWriteRunnerTest { private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder() .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build(); private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder() - .setParameter(Any.pack(PORT_SPEC)).build(); + .setPayload(PORT_SPEC.toByteString()).build(); private static final String CODER_ID = "string-coder-id"; private static final Coder<WindowedValue<String>> CODER = WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); @@ -117,7 +116,7 @@ public class BeamFnDataWriteRunnerTest { RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder() .setUrn("urn:org.apache.beam:sink:runner:0.1") - .setParameter(Any.pack(PORT_SPEC)) + .setPayload(PORT_SPEC.toByteString()) .build(); RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder() http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java index 7aec161..b9f22e8 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java @@ -31,9 +31,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -107,8 +105,7 @@ public class BoundedSourceRunnerTest { BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>( PipelineOptionsFactory.create(), - RunnerApi.FunctionSpec.newBuilder().setParameter( - Any.pack(BytesValue.newBuilder().setValue(encodedSource).build())).build(), + RunnerApi.FunctionSpec.newBuilder().setPayload(encodedSource).build(), consumers); runner.start(); @@ -127,13 +124,12 @@ public class BoundedSourceRunnerTest { List<ThrowingRunnable> startFunctions = new ArrayList<>(); List<ThrowingRunnable> finishFunctions = new ArrayList<>(); - RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder() - .setUrn("urn:org.apache.beam:source:java:0.1") - .setParameter(Any.pack(BytesValue.newBuilder() - .setValue(ByteString.copyFrom( - SerializableUtils.serializeToByteArray(CountingSource.upTo(3)))) - .build())) - .build(); + RunnerApi.FunctionSpec functionSpec = + RunnerApi.FunctionSpec.newBuilder() + .setUrn("urn:org.apache.beam:source:java:0.1") + .setPayload( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3)))) + .build(); RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder() .setSpec(functionSpec) http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index e269bcc..efa8fcf 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -31,9 +31,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.util.ArrayList; import java.util.List; import java.util.ServiceLoader; @@ -102,12 +100,11 @@ public class FnApiDoFnRunnerTest { ImmutableMap.of( Long.parseLong(mainOutputId), TestDoFn.mainOutput, Long.parseLong(additionalOutputId), TestDoFn.additionalOutput)); - RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder() - .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN) - .setParameter(Any.pack(BytesValue.newBuilder() - .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo))) - .build())) - .build(); + RunnerApi.FunctionSpec functionSpec = + RunnerApi.FunctionSpec.newBuilder() + .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN) + .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo))) + .build(); RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder() .setSpec(functionSpec) .putInputs("inputA", "inputATarget") http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index c56ef52..7ced5a9 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -206,22 +206,21 @@ class Coder(object): """For internal use only; no backwards-compatibility guarantees. """ # TODO(BEAM-115): Use specialized URNs and components. + serialized_coder = serialize_coder(self) return beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( urn=urns.PICKLED_CODER, - parameter=proto_utils.pack_Any( + any_param=proto_utils.pack_Any( google.protobuf.wrappers_pb2.BytesValue( - value=serialize_coder(self)))))) + value=serialized_coder)), + payload=serialized_coder))) @staticmethod def from_runner_api(proto, context): """For internal use only; no backwards-compatibility guarantees. """ - any_proto = proto.spec.spec.parameter - bytes_proto = google.protobuf.wrappers_pb2.BytesValue() - any_proto.Unpack(bytes_proto) - return deserialize_coder(bytes_proto.value) + return deserialize_coder(proto.spec.spec.payload) class StrUtf8Coder(Coder): http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/runners/portability/fn_api_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 3222bcb..7c0c06f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -218,16 +218,16 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): def side_inputs(self): for transform in self.transforms: if transform.spec.urn == urns.PARDO_TRANSFORM: - payload = proto_utils.unpack_Any( - transform.spec.parameter, beam_runner_api_pb2.ParDoPayload) + payload = proto_utils.parse_Bytes( + transform.spec.payload, beam_runner_api_pb2.ParDoPayload) for side_input in payload.side_inputs: yield transform.inputs[side_input] def has_as_main_input(self, pcoll): for transform in self.transforms: if transform.spec.urn == urns.PARDO_TRANSFORM: - payload = proto_utils.unpack_Any( - transform.spec.parameter, beam_runner_api_pb2.ParDoPayload) + payload = proto_utils.parse_Bytes( + transform.spec.payload, beam_runner_api_pb2.ParDoPayload) local_side_inputs = payload.side_inputs else: local_side_inputs = {} @@ -257,9 +257,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): transform = stage.transforms[0] if transform.spec.urn == urns.GROUP_BY_KEY_ONLY_TRANSFORM: # This is used later to correlate the read and write. - param = proto_utils.pack_Any( - wrappers_pb2.BytesValue( - value=str("group:%s" % stage.name))) + param = str("group:%s" % stage.name) gbk_write = Stage( transform.unique_name + '/Write', [beam_runner_api_pb2.PTransform( @@ -267,7 +265,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): inputs=transform.inputs, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_OUTPUT_URN, - parameter=param))], + any_param=proto_utils.pack_Any( + wrappers_pb2.BytesValue(value=param)), + payload=param))], downstream_side_inputs=frozenset(), must_follow=stage.must_follow) yield gbk_write @@ -279,7 +279,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): outputs=transform.outputs, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_INPUT_URN, - parameter=param))], + any_param=proto_utils.pack_Any( + wrappers_pb2.BytesValue(value=param)), + payload=param))], downstream_side_inputs=frozenset(), must_follow=union(frozenset([gbk_write]), stage.must_follow)) else: @@ -299,9 +301,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): transform = stage.transforms[0] if transform.spec.urn == urns.FLATTEN_TRANSFORM: # This is used later to correlate the read and writes. - param = proto_utils.pack_Any( - wrappers_pb2.BytesValue( - value=str("materialize:%s" % transform.unique_name))) + param = str("materialize:%s" % transform.unique_name) output_pcoll_id, = transform.outputs.values() output_coder_id = pcollections[output_pcoll_id].coder_id flatten_writes = [] @@ -337,7 +337,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): inputs={local_in: transcoded_pcollection}, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_OUTPUT_URN, - parameter=param))], + any_param=proto_utils.pack_Any( + wrappers_pb2.BytesValue( + value=param)), + payload=param))], downstream_side_inputs=frozenset(), must_follow=stage.must_follow) flatten_writes.append(flatten_write) @@ -350,7 +353,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): outputs=transform.outputs, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_INPUT_URN, - parameter=param))], + any_param=proto_utils.pack_Any( + wrappers_pb2.BytesValue( + value=param)), + payload=param))], downstream_side_inputs=frozenset(), must_follow=union(frozenset(flatten_writes), stage.must_follow)) @@ -439,9 +445,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): # Now try to fuse away all pcollections. for pcoll, producer in producers_by_pcoll.items(): - pcoll_as_param = proto_utils.pack_Any( - wrappers_pb2.BytesValue( - value=str("materialize:%s" % pcoll))) + pcoll_as_param = str("materialize:%s" % pcoll) write_pcoll = None for consumer in consumers_by_pcoll[pcoll]: producer = replacement(producer) @@ -461,7 +465,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): inputs={'in': pcoll}, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_OUTPUT_URN, - parameter=pcoll_as_param))]) + any_param=proto_utils.pack_Any( + wrappers_pb2.BytesValue( + value=pcoll_as_param)), + payload=pcoll_as_param))]) fuse(producer, write_pcoll) if consumer.has_as_main_input(pcoll): read_pcoll = Stage( @@ -471,7 +478,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): outputs={'out': pcoll}, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_INPUT_URN, - parameter=pcoll_as_param))], + any_param=proto_utils.pack_Any( + wrappers_pb2.BytesValue( + value=pcoll_as_param)), + payload=pcoll_as_param))], must_follow={write_pcoll}) fuse(read_pcoll, consumer) @@ -567,8 +577,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): data_side_input = {} data_output = {} for transform in stage.transforms: - pcoll_id = proto_utils.unpack_Any( - transform.spec.parameter, wrappers_pb2.BytesValue).value + pcoll_id = transform.spec.payload if transform.spec.urn in (bundle_processor.DATA_INPUT_URN, bundle_processor.DATA_OUTPUT_URN): if transform.spec.urn == bundle_processor.DATA_INPUT_URN: @@ -580,9 +589,11 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): else: raise NotImplementedError if data_operation_spec: - transform.spec.parameter.CopyFrom(data_operation_spec) + transform.spec.payload = data_operation_spec + transform.spec.any_param.CopyFrom(data_operation_spec) else: - transform.spec.parameter.Clear() + transform.spec.payload = "" + transform.spec.any_param.Clear() return data_input, data_side_input, data_output logging.info('Running %s', stage.name) @@ -728,7 +739,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): runner_sinks[(transform_id, target_name)] = operation transform_spec = beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_OUTPUT_URN, - parameter=proto_utils.pack_Any(data_operation_spec)) + any_param=proto_utils.pack_Any(data_operation_spec), + payload=data_operation_spec.SerializeToString() \ + if data_operation_spec is not None else None) elif isinstance(operation, operation_specs.WorkerRead): # A Read from an in-memory source is done over the data plane. @@ -742,19 +755,23 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): operation.source.source.default_output_coder()) transform_spec = beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_INPUT_URN, - parameter=proto_utils.pack_Any(data_operation_spec)) + any_param=proto_utils.pack_Any(data_operation_spec), + payload=data_operation_spec.SerializeToString() \ + if data_operation_spec is not None else None) else: # Otherwise serialize the source and execute it there. # TODO: Use SDFs with an initial impulse. # The Dataflow runner harness strips the base64 encoding. do the same # here until we get the same thing back that we sent in. + source_bytes = base64.b64decode( + pickler.dumps(operation.source.source)) transform_spec = beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.PYTHON_SOURCE_URN, - parameter=proto_utils.pack_Any( + any_param=proto_utils.pack_Any( wrappers_pb2.BytesValue( - value=base64.b64decode( - pickler.dumps(operation.source.source))))) + value=source_bytes)), + payload=source_bytes) elif isinstance(operation, operation_specs.WorkerDoFn): # Record the contents of each side input for access via the state api. @@ -773,8 +790,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): (operation.serialized_fn, side_input_extras)) transform_spec = beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.PYTHON_DOFN_URN, - parameter=proto_utils.pack_Any( - wrappers_pb2.BytesValue(value=augmented_serialized_fn))) + any_param=proto_utils.pack_Any( + wrappers_pb2.BytesValue(value=augmented_serialized_fn)), + payload=augmented_serialized_fn) elif isinstance(operation, operation_specs.WorkerFlatten): # Flatten is nice and simple. http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/runners/worker/bundle_processor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 9474eda..16c888c 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -282,9 +282,9 @@ class BeamTransformFactory(object): def create_operation(self, transform_id, consumers): transform_proto = self.descriptor.transforms[transform_id] creator, parameter_type = self._known_urns[transform_proto.spec.urn] - parameter = proto_utils.unpack_Any( - transform_proto.spec.parameter, parameter_type) - return creator(self, transform_id, transform_proto, parameter, consumers) + payload = proto_utils.parse_Bytes( + transform_proto.spec.payload, parameter_type) + return creator(self, transform_id, transform_proto, payload, consumers) def get_coder(self, coder_id): coder_proto = self.descriptor.coders[coder_id] @@ -293,9 +293,7 @@ class BeamTransformFactory(object): else: # No URN, assume cloud object encoding json bytes. return operation_specs.get_coder_from_spec( - json.loads( - proto_utils.unpack_Any(coder_proto.spec.spec.parameter, - wrappers_pb2.BytesValue).value)) + json.loads(coder_proto.spec.spec.payload)) def get_output_coders(self, transform_proto): return { @@ -360,10 +358,10 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers): data_channel=factory.data_channel_factory.create_data_channel(grpc_port)) -@BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, wrappers_pb2.BytesValue) +@BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, None) def create(factory, transform_id, transform_proto, parameter, consumers): # The Dataflow runner harness strips the base64 encoding. - source = pickler.loads(base64.b64encode(parameter.value)) + source = pickler.loads(base64.b64encode(parameter)) spec = operation_specs.WorkerRead( iobase.SourceBundle(1.0, source, None, None), [WindowedValueCoder(source.default_output_coder())]) @@ -395,9 +393,9 @@ def create(factory, transform_id, transform_proto, parameter, consumers): consumers) -@BeamTransformFactory.register_urn(PYTHON_DOFN_URN, wrappers_pb2.BytesValue) +@BeamTransformFactory.register_urn(PYTHON_DOFN_URN, None) def create(factory, transform_id, transform_proto, parameter, consumers): - dofn_data = pickler.loads(parameter.value) + dofn_data = pickler.loads(parameter) if len(dofn_data) == 2: # Has side input data. serialized_fn, side_input_data = dofn_data @@ -413,8 +411,7 @@ def create(factory, transform_id, transform_proto, parameter, consumers): urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload) def create(factory, transform_id, transform_proto, parameter, consumers): assert parameter.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO - serialized_fn = proto_utils.unpack_Any( - parameter.do_fn.spec.parameter, wrappers_pb2.BytesValue).value + serialized_fn = parameter.do_fn.spec.payload dofn_data = pickler.loads(serialized_fn) if len(dofn_data) == 2: # Has side input data. http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 3f92ce9..9018a49 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -699,24 +699,24 @@ class ParDo(PTransformWithSideInputs): def to_runner_api_parameter(self, context): assert self.__class__ is ParDo + picked_pardo_fn_data = pickler.dumps(self._pardo_fn_data()) return ( urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload( do_fn=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( urn=urns.PICKLED_DO_FN_INFO, - parameter=proto_utils.pack_Any( + any_param=proto_utils.pack_Any( wrappers_pb2.BytesValue( - value=pickler.dumps( - self._pardo_fn_data()))))))) + value=picked_pardo_fn_data)), + payload=picked_pardo_fn_data)))) @PTransform.register_urn( urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload) def from_runner_api_parameter(pardo_payload, context): assert pardo_payload.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO fn, args, kwargs, si_tags_and_types, windowing = pickler.loads( - proto_utils.unpack_Any( - pardo_payload.do_fn.spec.parameter, wrappers_pb2.BytesValue).value) + pardo_payload.do_fn.spec.payload) if si_tags_and_types: raise NotImplementedError('deferred side inputs') elif windowing: http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index da113e0..a798fa1 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -442,7 +442,9 @@ class PTransform(WithTypeHints, HasDisplayData): urn, typed_param = self.to_runner_api_parameter(context) return beam_runner_api_pb2.FunctionSpec( urn=urn, - parameter=proto_utils.pack_Any(typed_param)) + any_param=proto_utils.pack_Any(typed_param), + payload=typed_param.SerializeToString() + if typed_param is not None else None) @classmethod def from_runner_api(cls, proto, context): @@ -450,7 +452,7 @@ class PTransform(WithTypeHints, HasDisplayData): return None parameter_type, constructor = cls._known_urns[proto.urn] return constructor( - proto_utils.unpack_Any(proto.parameter, parameter_type), + proto_utils.parse_Bytes(proto.payload, parameter_type), context) def to_runner_api_parameter(self, context): http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/utils/proto_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py index af8f218..d7693f3 100644 --- a/sdks/python/apache_beam/utils/proto_utils.py +++ b/sdks/python/apache_beam/utils/proto_utils.py @@ -46,6 +46,17 @@ def unpack_Any(any_msg, msg_class): return msg +def parse_Bytes(bytes, msg_class): + """Parses the String of bytes into msg_class. + + Returns the input bytes if msg_class is None.""" + if msg_class is None: + return bytes + msg = msg_class() + msg.ParseFromString(bytes) + return msg + + def pack_Struct(**kwargs): """Returns a struct containing the values indicated by kwargs. """ http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/utils/urns.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index 0013cb3..acf729f 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -120,7 +120,9 @@ class RunnerApiFn(object): return beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( urn=urn, - parameter=proto_utils.pack_Any(typed_param))) + any_param=proto_utils.pack_Any(typed_param), + payload=typed_param.SerializeToString() + if typed_param is not None else None)) @classmethod def from_runner_api(cls, fn_proto, context): @@ -130,5 +132,5 @@ class RunnerApiFn(object): """ parameter_type, constructor = cls._known_urns[fn_proto.spec.urn] return constructor( - proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type), + proto_utils.parse_Bytes(fn_proto.spec.payload, parameter_type), context)