Remove References to CloudObject from the Java Harness Migrates to using the shared Runner API definitions.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/64cf18fc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/64cf18fc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/64cf18fc Branch: refs/heads/master Commit: 64cf18fcdb4237189a5212b6476bdadf73a2ac7f Parents: 3c81766 Author: Thomas Groh <tg...@google.com> Authored: Wed Jul 26 15:34:23 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue Aug 1 14:22:21 2017 -0700 ---------------------------------------------------------------------- .../beam/fn/harness/BeamFnDataReadRunner.java | 27 ++++++++---------- .../beam/fn/harness/BeamFnDataWriteRunner.java | 22 ++++++--------- .../fn/harness/BeamFnDataReadRunnerTest.java | 28 +++++++++---------- .../fn/harness/BeamFnDataWriteRunnerTest.java | 24 ++++++---------- .../beam/fn/harness/FnApiDoFnRunnerTest.java | 29 -------------------- 5 files changed, 41 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java index e2c17b0..1e611db 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -35,8 +34,8 @@ import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.dataflow.util.CloudObject; -import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; @@ -91,8 +90,9 @@ public class BeamFnDataReadRunner<OutputT> { .setPrimitiveTransformReference(pTransformId) .setName(getOnlyElement(pTransform.getOutputsMap().keySet())) .build(); - RunnerApi.Coder coderSpec = coders.get(pCollections.get( - getOnlyElement(pTransform.getOutputsMap().values())).getCoderId()); + RunnerApi.Coder coderSpec = + coders.get( + pCollections.get(getOnlyElement(pTransform.getOutputsMap().values())).getCoderId()); Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers = (Collection) pCollectionIdsToConsumers.get( getOnlyElement(pTransform.getOutputsMap().values())); @@ -102,6 +102,7 @@ public class BeamFnDataReadRunner<OutputT> { processBundleInstructionId, target, coderSpec, + coders, beamFnDataClient, consumers); addStartFunction.accept(runner::registerInputLocation); @@ -124,6 +125,7 @@ public class BeamFnDataReadRunner<OutputT> { Supplier<String> processBundleInstructionIdSupplier, BeamFnApi.Target inputTarget, RunnerApi.Coder coderSpec, + Map<String, RunnerApi.Coder> coders, BeamFnDataClient beamFnDataClientFactory, Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers) throws IOException { @@ -137,17 +139,10 @@ public class BeamFnDataReadRunner<OutputT> { @SuppressWarnings("unchecked") Coder<WindowedValue<OutputT>> coder = (Coder<WindowedValue<OutputT>>) - CloudObjects.coderFromCloudObject( - CloudObject.fromSpec( - OBJECT_MAPPER.readValue( - coderSpec - .getSpec() - .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() - .newInput(), - Map.class))); + CoderTranslation.fromProto( + coderSpec, + RehydratedComponents.forComponents( + RunnerApi.Components.newBuilder().putAllCoders(coders).build())); this.coder = coder; } http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java index eec4dfd..bbed753 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.Map; import java.util.function.Consumer; @@ -34,8 +33,8 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.dataflow.util.CloudObject; -import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; @@ -93,6 +92,7 @@ public class BeamFnDataWriteRunner<InputT> { processBundleInstructionId, target, coderSpec, + coders, beamFnDataClient); addStartFunction.accept(runner::registerForOutput); pCollectionIdsToConsumers.put( @@ -117,6 +117,7 @@ public class BeamFnDataWriteRunner<InputT> { Supplier<String> processBundleInstructionIdSupplier, BeamFnApi.Target outputTarget, RunnerApi.Coder coderSpec, + Map<String, RunnerApi.Coder> coders, BeamFnDataClient beamFnDataClientFactory) throws IOException { this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class) @@ -128,17 +129,10 @@ public class BeamFnDataWriteRunner<InputT> { @SuppressWarnings("unchecked") Coder<WindowedValue<InputT>> coder = (Coder<WindowedValue<InputT>>) - CloudObjects.coderFromCloudObject( - CloudObject.fromSpec( - OBJECT_MAPPER.readValue( - coderSpec - .getSpec() - .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() - .newInput(), - Map.class))); + CoderTranslation.fromProto( + coderSpec, + RehydratedComponents.forComponents( + RunnerApi.Components.newBuilder().putAllCoders(coders).build())); this.coder = coder; } http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java index a7c6666..d712f5f 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java @@ -30,7 +30,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; @@ -39,8 +38,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -56,10 +53,11 @@ import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.harness.test.TestExecutors; import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -79,7 +77,6 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) public class BeamFnDataReadRunnerTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder() .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build(); private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder() @@ -88,19 +85,19 @@ public class BeamFnDataReadRunnerTest { WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); private static final String CODER_SPEC_ID = "string-coder-id"; private static final RunnerApi.Coder CODER_SPEC; + private static final RunnerApi.Components COMPONENTS; private static final String URN = "urn:org.apache.beam:source:runner:0.1"; static { try { - CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec( - RunnerApi.SdkFunctionSpec.newBuilder().setSpec( - RunnerApi.FunctionSpec.newBuilder().setParameter( - Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))) - .build())) - .build()) - .build()) - .build(); + MessageWithComponents coderAndComponents = CoderTranslation.toProto(CODER); + CODER_SPEC = coderAndComponents.getCoder(); + COMPONENTS = + coderAndComponents + .getComponents() + .toBuilder() + .putCoders(CODER_SPEC_ID, CODER_SPEC) + .build(); } catch (IOException e) { throw new ExceptionInInitializerError(e); } @@ -150,7 +147,7 @@ public class BeamFnDataReadRunnerTest { Suppliers.ofInstance(bundleId)::get, ImmutableMap.of("outputPC", RunnerApi.PCollection.newBuilder().setCoderId(CODER_SPEC_ID).build()), - ImmutableMap.of(CODER_SPEC_ID, CODER_SPEC), + COMPONENTS.getCodersMap(), consumers, startFunctions::add, finishFunctions::add); @@ -200,6 +197,7 @@ public class BeamFnDataReadRunnerTest { bundleId::get, INPUT_TARGET, CODER_SPEC, + COMPONENTS.getCodersMap(), mockBeamFnDataClient, ImmutableList.of(valuesA::add, valuesB::add)); http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java index 28838b1..0caf19e 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java @@ -32,15 +32,12 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -53,10 +50,11 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -74,7 +72,6 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) public class BeamFnDataWriteRunnerTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder() .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build(); private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder() @@ -83,19 +80,15 @@ public class BeamFnDataWriteRunnerTest { private static final Coder<WindowedValue<String>> CODER = WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); private static final RunnerApi.Coder CODER_SPEC; + private static final RunnerApi.Components COMPONENTS; private static final String URN = "urn:org.apache.beam:sink:runner:0.1"; static { try { - CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec( - RunnerApi.SdkFunctionSpec.newBuilder().setSpec( - RunnerApi.FunctionSpec.newBuilder().setParameter( - Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))) - .build())) - .build()) - .build()) - .build(); + MessageWithComponents coderAndComponents = CoderTranslation.toProto(CODER); + CODER_SPEC = coderAndComponents.getCoder(); + COMPONENTS = + coderAndComponents.getComponents().toBuilder().putCoders(CODER_ID, CODER_SPEC).build(); } catch (IOException e) { throw new ExceptionInInitializerError(e); } @@ -140,7 +133,7 @@ public class BeamFnDataWriteRunnerTest { Suppliers.ofInstance(bundleId)::get, ImmutableMap.of("inputPC", RunnerApi.PCollection.newBuilder().setCoderId(CODER_ID).build()), - ImmutableMap.of(CODER_ID, CODER_SPEC), + COMPONENTS.getCodersMap(), consumers, startFunctions::add, finishFunctions::add); @@ -201,6 +194,7 @@ public class BeamFnDataWriteRunnerTest { bundleId::get, OUTPUT_TARGET, CODER_SPEC, + COMPONENTS.getCodersMap(), mockBeamFnDataClient); // Process for bundle id 0 http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index 98362a2..e269bcc 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; @@ -35,19 +34,14 @@ import com.google.common.collect.Multimap; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; -import com.google.protobuf.Message; -import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.ServiceLoader; import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.runners.core.construction.ParDoTranslation; -import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.DoFnInfo; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -66,28 +60,6 @@ import org.junit.runners.JUnit4; /** Tests for {@link FnApiDoFnRunner}. */ @RunWith(JUnit4.class) public class FnApiDoFnRunnerTest { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final Coder<WindowedValue<String>> STRING_CODER = - WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); - private static final String STRING_CODER_SPEC_ID = "999L"; - private static final RunnerApi.Coder STRING_CODER_SPEC; - - static { - try { - STRING_CODER_SPEC = RunnerApi.Coder.newBuilder() - .setSpec(RunnerApi.SdkFunctionSpec.newBuilder() - .setSpec(RunnerApi.FunctionSpec.newBuilder() - .setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER)))) - .build()))) - .build()) - .build(); - } catch (IOException e) { - throw new ExceptionInInitializerError(e); - } - } - private static class TestDoFn extends DoFn<String, String> { private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput"); private static final TupleTag<String> additionalOutput = new TupleTag<>("output"); @@ -117,7 +89,6 @@ public class FnApiDoFnRunnerTest { */ @Test public void testCreatingAndProcessingDoFn() throws Exception { - Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC); String pTransformId = "pTransformId"; String mainOutputId = "101"; String additionalOutputId = "102";