Repository: beam Updated Branches: refs/heads/master 30e611646 -> d7151fb2b
Remove Coder.asCloudObject and related methods These methods belong in the Dataflow module. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/94950484 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/94950484 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/94950484 Branch: refs/heads/master Commit: 94950484a416afb2d250c4b4b45e20a16a3bbb9d Parents: 30e6116 Author: Thomas Groh <tg...@google.com> Authored: Mon May 1 13:03:18 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Tue May 2 17:59:25 2017 -0700 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 2 +- .../dataflow/DataflowPipelineTranslator.java | 6 +-- .../java/org/apache/beam/sdk/coders/Coder.java | 7 --- .../org/apache/beam/sdk/coders/CustomCoder.java | 48 -------------------- .../apache/beam/sdk/coders/IterableCoder.java | 9 ---- .../org/apache/beam/sdk/coders/KvCoder.java | 9 ---- .../beam/sdk/coders/LengthPrefixCoder.java | 6 --- .../apache/beam/sdk/coders/StructuredCoder.java | 43 ------------------ .../beam/sdk/testing/CoderProperties.java | 5 +- .../sdk/transforms/windowing/GlobalWindow.java | 6 --- .../transforms/windowing/IntervalWindow.java | 6 --- .../apache/beam/sdk/util/SerializableUtils.java | 26 ++++------- .../org/apache/beam/sdk/util/WindowedValue.java | 15 ------ .../beam/sdk/coders/IterableCoderTest.java | 10 ---- .../org/apache/beam/sdk/coders/KvCoderTest.java | 11 ----- .../transforms/windowing/GlobalWindowTest.java | 7 --- .../apache/beam/sdk/util/WindowedValueTest.java | 10 ---- sdks/java/extensions/protobuf/pom.xml | 10 ---- .../sdk/extensions/protobuf/ProtoCoder.java | 29 ------------ 19 files changed, 14 insertions(+), 251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index c0b6328..bbad156 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -33,7 +33,7 @@ <packaging>jar</packaging> <properties> - <dataflow.container_version>beam-master-20170501-pr2718</dataflow.container_version> + <dataflow.container_version>beam-master-20170502</dataflow.container_version> <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version> <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version> </properties> http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 0c0a2ef..28a9c1c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -62,6 +62,7 @@ import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle; import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.runners.dataflow.util.OutputReference; import org.apache.beam.sdk.Pipeline; @@ -87,7 +88,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -557,7 +557,7 @@ public class DataflowPipelineTranslator { @Override public void addEncodingInput(Coder<?> coder) { - CloudObject encoding = SerializableUtils.ensureSerializable(coder); + CloudObject encoding = CloudObjects.asCloudObject(coder); addObject(getProperties(), PropertyNames.ENCODING, encoding); } @@ -669,7 +669,7 @@ public class DataflowPipelineTranslator { if (valueCoder != null) { // Verify that encoding can be decoded, in order to catch serialization // failures as early as possible. - CloudObject encoding = SerializableUtils.ensureSerializable(valueCoder); + CloudObject encoding = CloudObjects.asCloudObject(valueCoder); addObject(outputInfo, PropertyNames.ENCODING, encoding); translator.outputCoders.put(value, valueCoder); } http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index 632cf89..8ba8ad3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -31,7 +31,6 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.TypeDescriptor; @@ -142,12 +141,6 @@ public interface Coder<T> extends Serializable { List<? extends Coder<?>> getCoderArguments(); /** - * Returns the {@link CloudObject} that represents this {@code Coder}. - */ - @Deprecated - CloudObject asCloudObject(); - - /** * Throw {@link NonDeterministicException} if the coding is not deterministic. * * <p>In order for a {@code Coder} to be considered deterministic, http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java index 1627f8a..87bd531 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java @@ -17,16 +17,9 @@ */ package org.apache.beam.sdk.coders; -import static org.apache.beam.sdk.util.Structs.addString; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import java.io.Serializable; import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.StringUtils; /** * An abstract base class for writing a {@link Coder} class that encodes itself via Java @@ -42,28 +35,6 @@ import org.apache.beam.sdk.util.StringUtils; */ public abstract class CustomCoder<T> extends StructuredCoder<T> implements Serializable { - - @JsonCreator - @Deprecated - public static CustomCoder<?> of( - // N.B. typeId is a required parameter here, since a field named "@type" - // is presented to the deserializer as an input. - // - // If this method did not consume the field, Jackson2 would observe an - // unconsumed field and a returned value of a derived type. So Jackson2 - // would attempt to update the returned value with the unconsumed field - // data, The standard JsonDeserializer does not implement a mechanism for - // updating constructed values, so it would throw an exception, causing - // deserialization to fail. - @JsonProperty(value = "@type", required = false) String typeId, - @JsonProperty(value = "encoding_id", required = false) String encodingId, - @JsonProperty("type") String type, - @JsonProperty("serialized_coder") String serializedCoder) { - return (CustomCoder<?>) SerializableUtils.deserializeFromByteArray( - StringUtils.jsonStringToByteArray(serializedCoder), - type); - } - /** * {@inheritDoc}. * @@ -85,25 +56,6 @@ public abstract class CustomCoder<T> extends StructuredCoder<T> /** * {@inheritDoc} * - * @return A thin {@link CloudObject} wrapping of the Java serialization of {@code this}. - */ - @Override - public final CloudObject initializeCloudObject() { - // N.B. We use the CustomCoder class, not the derived class, since during - // deserialization we will be using the CustomCoder's static factory method - // to construct an instance of the derived class. - CloudObject result = CloudObject.forClass(CustomCoder.class); - addString(result, "type", getClass().getName()); - addString(result, "serialized_coder", - StringUtils.byteArrayToJsonString( - SerializableUtils.serializeToByteArray(this))); - - return result; - } - - /** - * {@inheritDoc} - * * @throws NonDeterministicException a {@link CustomCoder} is presumed * nondeterministic. */ http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java index 273a896..2949ddb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java @@ -18,12 +18,10 @@ package org.apache.beam.sdk.coders; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.sdk.util.Structs.addBoolean; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; -import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeParameter; @@ -70,13 +68,6 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> { } @Override - protected CloudObject initializeCloudObject() { - CloudObject result = CloudObject.forClassName("kind:stream"); - addBoolean(result, PropertyNames.IS_STREAM_LIKE, true); - return result; - } - - @Override public TypeDescriptor<Iterable<T>> getEncodedTypeDescriptor() { return new TypeDescriptor<Iterable<T>>() {}.where( new TypeParameter<T>() {}, getElemCoder().getEncodedTypeDescriptor()); http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java index 3d813b6..b10db3a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.coders; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.sdk.util.Structs.addBoolean; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -27,7 +26,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.KV; @@ -123,13 +121,6 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> { } } - @Override - protected CloudObject initializeCloudObject() { - CloudObject result = CloudObject.forClassName("kind:pair"); - addBoolean(result, PropertyNames.IS_PAIR_LIKE, true); - return result; - } - /** * Returns whether both keyCoder and valueCoder are considered not expensive. */ http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java index b73fb7f..be26531 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java @@ -30,7 +30,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.List; import javax.annotation.Nullable; -import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.VarInt; @@ -67,11 +66,6 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> { } @Override - protected CloudObject initializeCloudObject() { - return CloudObject.forClassName("kind:length_prefix"); - } - - @Override public void encode(T value, OutputStream outStream, Context context) throws CoderException, IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java index bce382c..0cd53b0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java @@ -17,17 +17,12 @@ */ package org.apache.beam.sdk.coders; -import static org.apache.beam.sdk.util.Structs.addList; - import com.google.common.io.ByteStreams; import com.google.common.io.CountingOutputStream; import java.io.ByteArrayOutputStream; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.TypeDescriptor; @@ -104,44 +99,6 @@ public abstract class StructuredCoder<T> implements Coder<T> { } /** - * Adds the following properties to the {@link CloudObject} representation: - * <ul> - * <li>component_encodings: A list of coders represented as {@link CloudObject}s - * equivalent to the {@link #getCoderArguments}.</li> - * </ul> - * - * <p>{@link StructuredCoder} implementations should override {@link #initializeCloudObject} - * to customize the {@link CloudObject} representation. - */ - @Override - public final CloudObject asCloudObject() { - CloudObject result = initializeCloudObject(); - - List<? extends Coder<?>> components = getComponents(); - if (!components.isEmpty()) { - List<CloudObject> cloudComponents = new ArrayList<>(components.size()); - for (Coder<?> coder : components) { - cloudComponents.add(coder.asCloudObject()); - } - addList(result, PropertyNames.COMPONENT_ENCODINGS, cloudComponents); - } - - return result; - } - - /** - * Subclasses should override this method to customize the {@link CloudObject} - * representation. {@link StructuredCoder#asCloudObject} delegates to this method - * to provide an initial {@link CloudObject}. - * - * <p>The default implementation returns a {@link CloudObject} using - * {@link Object#getClass} for the type. - */ - protected CloudObject initializeCloudObject() { - return CloudObject.forClass(getClass()); - } - - /** * {@inheritDoc} * * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver} http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java index bd6d86a..6e0e264 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java @@ -42,7 +42,6 @@ import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.Serializer; import org.apache.beam.sdk.util.UnownedInputStream; import org.apache.beam.sdk.util.UnownedOutputStream; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; @@ -352,7 +351,7 @@ public class CoderProperties { static <T> byte[] encode( Coder<T> coder, Coder.Context context, T value) throws CoderException, IOException { @SuppressWarnings("unchecked") - Coder<T> deserializedCoder = Serializer.deserialize(coder.asCloudObject(), Coder.class); + Coder<T> deserializedCoder = SerializableUtils.clone(coder); ByteArrayOutputStream os = new ByteArrayOutputStream(); deserializedCoder.encode(value, new UnownedOutputStream(os), context); @@ -363,7 +362,7 @@ public class CoderProperties { static <T> T decode( Coder<T> coder, Coder.Context context, byte[] bytes) throws CoderException, IOException { @SuppressWarnings("unchecked") - Coder<T> deserializedCoder = Serializer.deserialize(coder.asCloudObject(), Coder.class); + Coder<T> deserializedCoder = SerializableUtils.clone(coder); byte[] buffer; if (context == Coder.Context.NESTED) { http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java index 79c9352..0276ba6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java @@ -22,7 +22,6 @@ import java.io.OutputStream; import java.util.Collections; import java.util.List; import org.apache.beam.sdk.coders.StructuredCoder; -import org.apache.beam.sdk.util.CloudObject; import org.joda.time.Duration; import org.joda.time.Instant; @@ -84,11 +83,6 @@ public class GlobalWindow extends BoundedWindow { } @Override - protected CloudObject initializeCloudObject() { - return CloudObject.forClassName("kind:global_window"); - } - - @Override public final List<org.apache.beam.sdk.coders.Coder<?>> getCoderArguments() { return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java index 55bf585..fd2a2d8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.DurationCoder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StructuredCoder; -import org.apache.beam.sdk.util.CloudObject; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.ReadableDuration; @@ -216,10 +215,5 @@ public class IntervalWindow extends BoundedWindow public List<? extends Coder<?>> getCoderArguments() { return Collections.emptyList(); } - - @Override - protected CloudObject initializeCloudObject() { - return CloudObject.forClassName("kind:interval_window"); - } } } http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 6b3218e..d4bfd0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -98,31 +98,21 @@ public class SerializableUtils { * <p>Throws a RuntimeException if serialized Coder cannot be deserialized, or * if the deserialized instance is not equal to the original. * - * @return the serialized Coder, as a {@link CloudObject} + * @return the deserialized Coder */ - public static CloudObject ensureSerializable(Coder<?> coder) { + public static Coder<?> ensureSerializable(Coder<?> coder) { // Make sure that Coders are java serializable as well since // they are regularly captured within DoFn's. Coder<?> copy = (Coder<?>) ensureSerializable((Serializable) coder); - CloudObject cloudObject = copy.asCloudObject(); - - Coder<?> decoded; - try { - decoded = Serializer.deserialize(cloudObject, Coder.class); - } catch (RuntimeException e) { - throw new RuntimeException( - String.format("Unable to deserialize Coder: %s. " - + "Check that a suitable constructor is defined. " - + "See Coder for details.", coder), e - ); - } - checkState(coder.equals(decoded), + checkState( + coder.equals(copy), "Coder not equal to original after serialization, indicating that the Coder may not " - + "implement serialization correctly. Before: %s, after: %s, cloud encoding: %s", - coder, decoded, cloudObject); + + "implement serialization correctly. Before: %s, after: %s", + coder, + copy); - return cloudObject; + return copy; } /** http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index fc9a404..23666ca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.sdk.util.Structs.addBoolean; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -696,13 +695,6 @@ public abstract class WindowedValue<T> { } @Override - public CloudObject initializeCloudObject() { - CloudObject result = CloudObject.forClassName("kind:windowed_value"); - addBoolean(result, PropertyNames.IS_WRAPPER, true); - return result; - } - - @Override public List<? extends Coder<?>> getCoderArguments() { return null; } @@ -770,13 +762,6 @@ public abstract class WindowedValue<T> { } @Override - public CloudObject initializeCloudObject() { - CloudObject result = CloudObject.forClass(getClass()); - addBoolean(result, PropertyNames.IS_WRAPPER, true); - return result; - } - - @Override public List<? extends Coder<?>> getCoderArguments() { return Arrays.<Coder<?>>asList(valueCoder); } http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java index 80c3a25..1e56135 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java @@ -21,7 +21,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.Collections; @@ -29,9 +28,7 @@ import java.util.LinkedList; import java.util.List; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.Structs; import org.apache.beam.sdk.values.TypeDescriptor; import org.junit.Rule; import org.junit.Test; @@ -52,13 +49,6 @@ public class IterableCoderTest { new LinkedList<>(Arrays.asList(7, 6, 5))); @Test - public void testCloudObjectRepresentation() throws Exception { - CloudObject cloudObject = TEST_CODER.asCloudObject(); - assertEquals("kind:stream", cloudObject.getClassName()); - assertTrue(Structs.getBoolean(cloudObject, "is_stream_like")); - } - - @Test public void testCoderIsSerializableWithWellKnownCoderType() throws Exception { CoderProperties.coderSerializable(ListCoder.of(GlobalWindow.Coder.INSTANCE)); } http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java index 4aa3d27..56ee403 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java @@ -18,18 +18,14 @@ package org.apache.beam.sdk.coders; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.Structs; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptor; import org.junit.Rule; @@ -101,13 +97,6 @@ public class KvCoderTest { private static final Coder<KV<String, Integer>> TEST_CODER = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); - @Test - public void testCloudObjectRepresentation() throws Exception { - CloudObject cloudObject = TEST_CODER.asCloudObject(); - assertEquals("kind:pair", cloudObject.getClassName()); - assertTrue(Structs.getBoolean(cloudObject, "is_pair_like")); - } - private static final List<KV<String, Integer>> TEST_VALUES = Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", Integer.MAX_VALUE)); http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java index 1857332..314b969 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java @@ -23,7 +23,6 @@ import com.google.common.io.ByteStreams; import com.google.common.io.CountingOutputStream; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.util.CloudObject; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -32,12 +31,6 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class GlobalWindowTest { @Test - public void testCoderCloudObjectRepresentation() throws Exception { - CloudObject cloudObject = GlobalWindow.Coder.INSTANCE.asCloudObject(); - assertEquals("kind:global_window", cloudObject.getClassName()); - } - - @Test public void testCoderBinaryRepresentation() throws Exception { CountingOutputStream out = new CountingOutputStream(ByteStreams.nullOutputStream()); GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.OUTER); http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index 8bfdcef..70ec8b0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -20,9 +20,7 @@ package org.apache.beam.sdk.util; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -68,14 +66,6 @@ public class WindowedValueTest { } @Test - public void testWindowedValueCoderCloudObjectRepresentation() throws Exception { - CloudObject cloudObject = WindowedValue.getFullCoder( - StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE).asCloudObject(); - assertEquals("kind:windowed_value", cloudObject.getClassName()); - assertTrue(Structs.getBoolean(cloudObject, "is_wrapper")); - } - - @Test public void testFullWindowedValueCoderIsSerializableWithWellKnownCoderType() { CoderProperties.coderSerializable(WindowedValue.getFullCoder( GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE)); http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/extensions/protobuf/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/pom.xml b/sdks/java/extensions/protobuf/pom.xml index 81fb4de..c331b07 100644 --- a/sdks/java/extensions/protobuf/pom.xml +++ b/sdks/java/extensions/protobuf/pom.xml @@ -73,16 +73,6 @@ <artifactId>guava</artifactId> </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - </dependency> - - <dependency> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </dependency> - <!-- build dependencies --> <dependency> <groupId>com.google.auto.service</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/94950484/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java index 8e90a5f..9577c6e 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java @@ -19,10 +19,7 @@ package org.apache.beam.sdk.extensions.protobuf; import static com.google.common.base.Preconditions.checkArgument; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.protobuf.ExtensionRegistry; import com.google.protobuf.Message; @@ -34,12 +31,10 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.Arrays; -import java.util.List; import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import javax.annotation.Nullable; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -280,30 +275,6 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> { this.extensionHostClasses = extensionHostClasses; } - /** - * @deprecated For JSON deserialization only. - */ - @JsonCreator - @Deprecated - public static <T extends Message> ProtoCoder<T> of( - @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName, - @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) { - - try { - @SuppressWarnings("unchecked") - Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName); - List<Class<?>> extensionHostClasses = Lists.newArrayList(); - if (extensionHostClassNames != null) { - for (String extensionHostClassName : extensionHostClassNames) { - extensionHostClasses.add(Class.forName(extensionHostClassName)); - } - } - return of(protoMessageClass).withExtensionsFrom(extensionHostClasses); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException(e); - } - } - /** Get the memoized {@link Parser}, possibly initializing it lazily. */ private Parser<T> getParser() { if (memoizedParser == null) {