Swap to use initializeCloudObject as customization point for CloudObjects. Hide StandardCoder#getComponents() and have coders only rely on Coder#getCoderArguments()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b76d3dc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b76d3dc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b76d3dc Branch: refs/heads/master Commit: 1b76d3dc18a1367d2530fc870e8cb3046cdc714f Parents: 3de4108 Author: Luke Cwik <lc...@google.com> Authored: Thu Dec 29 13:39:45 2016 -0800 Committer: Luke Cwik <lc...@google.com> Committed: Tue Jan 3 16:35:37 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/dataflow/internal/IsmFormat.java | 14 +++++++------- .../beam/runners/spark/coders/WritableCoder.java | 4 ++-- .../org/apache/beam/sdk/coders/AtomicCoder.java | 2 +- .../org/apache/beam/sdk/coders/AvroCoder.java | 4 ++-- .../org/apache/beam/sdk/coders/CustomCoder.java | 18 +----------------- .../org/apache/beam/sdk/coders/IterableCoder.java | 4 ++-- .../org/apache/beam/sdk/coders/JAXBCoder.java | 4 ++-- .../java/org/apache/beam/sdk/coders/KvCoder.java | 4 ++-- .../apache/beam/sdk/coders/LengthPrefixCoder.java | 2 ++ .../apache/beam/sdk/coders/SerializableCoder.java | 4 ++-- .../org/apache/beam/sdk/coders/StandardCoder.java | 12 ++++++++++-- .../beam/sdk/coders/protobuf/ProtoCoder.java | 8 ++++---- .../org/apache/beam/sdk/transforms/Combine.java | 11 +++-------- .../beam/sdk/transforms/join/CoGbkResult.java | 13 +++---------- .../org/apache/beam/sdk/util/WindowedValue.java | 12 ++++++------ .../org/apache/beam/sdk/coders/KvCoderTest.java | 5 +++++ .../beam/sdk/util/SerializableUtilsTest.java | 4 ++-- .../apache/beam/sdk/io/hdfs/AvroWrapperCoder.java | 4 ++-- .../apache/beam/sdk/io/hdfs/WritableCoder.java | 4 ++-- 19 files changed, 60 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index 6a244b0..5b733c8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -125,7 +125,7 @@ public class IsmFormat { checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components."); checkArgument(!isMetadataKey(keyComponents), "Expected key components to not contain metadata key."); - return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, value, null); + return new AutoValue_IsmFormat_IsmRecord<>(keyComponents, value, null); } public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] metadata) { @@ -133,7 +133,7 @@ public class IsmFormat { checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components."); checkArgument(isMetadataKey(keyComponents), "Expected key components to contain metadata key."); - return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, null, metadata); + return new AutoValue_IsmFormat_IsmRecord<>(keyComponents, null, metadata); } /** Returns the list of key components. */ @@ -379,11 +379,11 @@ public class IsmFormat { } @Override - public CloudObject asCloudObject() { - CloudObject cloudObject = super.asCloudObject(); - addLong(cloudObject, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders); - addLong(cloudObject, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders); - return cloudObject; + protected CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); + addLong(result, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders); + addLong(result, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders); + return result; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java index e63c660..40c2627 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java @@ -107,8 +107,8 @@ public class WritableCoder<T extends Writable> extends StandardCoder<T> { } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + protected CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); result.put("type", type.getName()); return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java index 60908fa..c024f89 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java @@ -32,7 +32,7 @@ public abstract class AtomicCoder<T> extends DeterministicStandardCoder<T> { protected AtomicCoder() { } @Override - public List<Coder<?>> getCoderArguments() { + public final List<Coder<?>> getCoderArguments() { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 41afdc6..eee0906 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -327,8 +327,8 @@ public class AvroCoder<T> extends StandardCoder<T> { } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + protected CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); addString(result, "type", type.getName()); addString(result, "schema", schemaSupplier.get().toString()); return result; http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java index 2614cc1..59d29de 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java @@ -17,17 +17,12 @@ */ package org.apache.beam.sdk.coders; -import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.sdk.util.Structs.addString; -import static org.apache.beam.sdk.util.Structs.addStringList; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Lists; import java.io.Serializable; -import java.util.Collection; import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.StringUtils; @@ -72,7 +67,7 @@ public abstract class CustomCoder<T> extends AtomicCoder<T> * @return A thin {@link CloudObject} wrapping of the Java serialization of {@code this}. */ @Override - public CloudObject asCloudObject() { + public CloudObject initializeCloudObject() { // N.B. We use the CustomCoder class, not the derived class, since during // deserialization we will be using the CustomCoder's static factory method // to construct an instance of the derived class. @@ -82,17 +77,6 @@ public abstract class CustomCoder<T> extends AtomicCoder<T> StringUtils.byteArrayToJsonString( SerializableUtils.serializeToByteArray(this))); - String encodingId = getEncodingId(); - checkNotNull(encodingId, "Coder.getEncodingId() must not return null."); - if (!encodingId.isEmpty()) { - addString(result, PropertyNames.ENCODING_ID, encodingId); - } - - Collection<String> allowedEncodings = getAllowedEncodings(); - if (!allowedEncodings.isEmpty()) { - addStringList(result, PropertyNames.ALLOWED_ENCODINGS, Lists.newArrayList(allowedEncodings)); - } - return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java index 11fb172..cc6b970 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java @@ -68,8 +68,8 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> { } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + protected CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); addBoolean(result, PropertyNames.IS_STREAM_LIKE, true); return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java index 748b07d..7afd225 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java @@ -167,8 +167,8 @@ public class JAXBCoder<T> extends AtomicCoder<T> { } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + protected CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); Structs.addString(result, JAXB_CLASS, jaxbClass.getName()); return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java index c0d3aa4..1e70a30 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java @@ -122,8 +122,8 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> { } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + protected CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); addBoolean(result, PropertyNames.IS_PAIR_LIKE, true); return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java index dd9af32..7319200 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java @@ -65,6 +65,8 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> { this.valueCoder = valueCoder; } + + @Override public void encode(T value, OutputStream outStream, Context context) throws CoderException, IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java index 46777b9..de7cea8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java @@ -144,8 +144,8 @@ public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> { } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); result.put("type", type.getName()); return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java index 0e57ed2..c17c376 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java @@ -120,9 +120,13 @@ public abstract class StandardCoder<T> implements Coder<T> { return builder.toString(); } + /** + * {@link StandardCoder} implementations should override {@link #initializeCloudObject} + * if the default {@link CloudObject} representation needs to change. + */ @Override - public CloudObject asCloudObject() { - CloudObject result = CloudObject.forClass(getClass()); + public final CloudObject asCloudObject() { + CloudObject result = initializeCloudObject(); List<? extends Coder<?>> components = getComponents(); if (!components.isEmpty()) { @@ -147,6 +151,10 @@ public abstract class StandardCoder<T> implements Coder<T> { return result; } + protected CloudObject initializeCloudObject() { + return CloudObject.forClass(getClass()); + } + /** * {@inheritDoc} * http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java index 9bba42b..a5f53ff 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java @@ -124,7 +124,7 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> { * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}. */ public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) { - return new ProtoCoder<T>(protoMessageClass, ImmutableSet.<Class<?>>of()); + return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of()); } /** @@ -162,7 +162,7 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> { } } - return new ProtoCoder<T>( + return new ProtoCoder<>( protoMessageClass, new ImmutableSet.Builder<Class<?>>() .addAll(extensionHostClasses) @@ -337,8 +337,8 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> { } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName()); List<CloudObject> extensionHostClassNames = Lists.newArrayList(); for (String className : getSortedExtensionClasses()) { http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 92c04ca..98a7bec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -43,6 +42,7 @@ import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; @@ -654,11 +654,6 @@ public class Combine { } @Override - public List<Coder<?>> getCoderArguments() { - return Arrays.<Coder<?>>asList(valueCoder); - } - - @Override public void encode(Holder<V> accumulator, OutputStream outStream, Context context) throws CoderException, IOException { if (accumulator.present) { @@ -2225,11 +2220,11 @@ public class Combine { } public static <InputT, AccumT> InputOrAccum<InputT, AccumT> input(InputT input) { - return new InputOrAccum<InputT, AccumT>(input, null); + return new InputOrAccum<>(input, null); } public static <InputT, AccumT> InputOrAccum<InputT, AccumT> accum(AccumT aggr) { - return new InputOrAccum<InputT, AccumT>(null, aggr); + return new InputOrAccum<>(null, aggr); } private static class InputOrAccumCoder<InputT, AccumT> http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index 7b849e7..9e0a011 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -30,7 +30,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -242,20 +241,14 @@ public class CoGbkResult { this.unionCoder = unionCoder; } - @Override public List<? extends Coder<?>> getCoderArguments() { - return null; - } - - @Override - public List<? extends Coder<?>> getComponents() { - return Arrays.<Coder<?>>asList(unionCoder); + return ImmutableList.of(unionCoder); } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); addObject(result, PropertyNames.CO_GBK_RESULT_SCHEMA, schema.asCloudObject()); return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 1b3e648..ce13317 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -131,7 +131,7 @@ public abstract class WindowedValue<T> { */ @Deprecated public static <T> WindowedValue<T> valueInEmptyWindows(T value) { - return new ValueInEmptyWindows<T>(value, PaneInfo.NO_FIRING); + return new ValueInEmptyWindows<>(value, PaneInfo.NO_FIRING); } /** @@ -143,7 +143,7 @@ public abstract class WindowedValue<T> { */ @Deprecated public static <T> WindowedValue<T> valueInEmptyWindows(T value, PaneInfo pane) { - return new ValueInEmptyWindows<T>(value, pane); + return new ValueInEmptyWindows<>(value, pane); } /** @@ -696,8 +696,8 @@ public abstract class WindowedValue<T> { } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); addBoolean(result, PropertyNames.IS_WRAPPER, true); return result; } @@ -770,8 +770,8 @@ public abstract class WindowedValue<T> { } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); addBoolean(result, PropertyNames.IS_WRAPPER, true); return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java index 436e227..4c07c83 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java @@ -95,6 +95,11 @@ public class KvCoderTest { private static final Coder<KV<String, Integer>> TEST_CODER = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + @Test + public void testEnc() { + System.out.println(TEST_CODER.asCloudObject()); + } + private static final List<KV<String, Integer>> TEST_VALUES = Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", Integer.MAX_VALUE)); http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java index 5435a45..9f86ed2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java @@ -129,8 +129,8 @@ public class SerializableUtilsTest { } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); result.put("unserializableField", unserializableField); return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java index 45a8037..7e01846 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java @@ -100,8 +100,8 @@ public class AvroWrapperCoder<WrapperT extends AvroWrapper<DatumT>, DatumT> } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); result.put("wrapperType", wrapperType.getName()); return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java index 96ba87a..637e686 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java @@ -101,8 +101,8 @@ public class WritableCoder<T extends Writable> extends StandardCoder<T> { } @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); result.put("type", type.getName()); return result; }