Swap to use initializeCloudObject as customization point for CloudObjects.
Hide StandardCoder#getComponents() and have coders only rely on 
Coder#getCoderArguments()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b76d3dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b76d3dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b76d3dc

Branch: refs/heads/master
Commit: 1b76d3dc18a1367d2530fc870e8cb3046cdc714f
Parents: 3de4108
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 29 13:39:45 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/internal/IsmFormat.java | 14 +++++++-------
 .../beam/runners/spark/coders/WritableCoder.java  |  4 ++--
 .../org/apache/beam/sdk/coders/AtomicCoder.java   |  2 +-
 .../org/apache/beam/sdk/coders/AvroCoder.java     |  4 ++--
 .../org/apache/beam/sdk/coders/CustomCoder.java   | 18 +-----------------
 .../org/apache/beam/sdk/coders/IterableCoder.java |  4 ++--
 .../org/apache/beam/sdk/coders/JAXBCoder.java     |  4 ++--
 .../java/org/apache/beam/sdk/coders/KvCoder.java  |  4 ++--
 .../apache/beam/sdk/coders/LengthPrefixCoder.java |  2 ++
 .../apache/beam/sdk/coders/SerializableCoder.java |  4 ++--
 .../org/apache/beam/sdk/coders/StandardCoder.java | 12 ++++++++++--
 .../beam/sdk/coders/protobuf/ProtoCoder.java      |  8 ++++----
 .../org/apache/beam/sdk/transforms/Combine.java   | 11 +++--------
 .../beam/sdk/transforms/join/CoGbkResult.java     | 13 +++----------
 .../org/apache/beam/sdk/util/WindowedValue.java   | 12 ++++++------
 .../org/apache/beam/sdk/coders/KvCoderTest.java   |  5 +++++
 .../beam/sdk/util/SerializableUtilsTest.java      |  4 ++--
 .../apache/beam/sdk/io/hdfs/AvroWrapperCoder.java |  4 ++--
 .../apache/beam/sdk/io/hdfs/WritableCoder.java    |  4 ++--
 19 files changed, 60 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 6a244b0..5b733c8 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -125,7 +125,7 @@ public class IsmFormat {
       checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key 
components.");
       checkArgument(!isMetadataKey(keyComponents),
           "Expected key components to not contain metadata key.");
-      return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, value, null);
+      return new AutoValue_IsmFormat_IsmRecord<>(keyComponents, value, null);
     }
 
     public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] 
metadata) {
@@ -133,7 +133,7 @@ public class IsmFormat {
       checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key 
components.");
       checkArgument(isMetadataKey(keyComponents),
           "Expected key components to contain metadata key.");
-      return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, null, 
metadata);
+      return new AutoValue_IsmFormat_IsmRecord<>(keyComponents, null, 
metadata);
     }
 
     /** Returns the list of key components. */
@@ -379,11 +379,11 @@ public class IsmFormat {
     }
 
     @Override
-    public CloudObject asCloudObject() {
-      CloudObject cloudObject = super.asCloudObject();
-      addLong(cloudObject, PropertyNames.NUM_SHARD_CODERS, 
numberOfShardKeyCoders);
-      addLong(cloudObject, PropertyNames.NUM_METADATA_SHARD_CODERS, 
numberOfMetadataShardKeyCoders);
-      return cloudObject;
+    protected CloudObject initializeCloudObject() {
+      CloudObject result = CloudObject.forClass(getClass());
+      addLong(result, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
+      addLong(result, PropertyNames.NUM_METADATA_SHARD_CODERS, 
numberOfMetadataShardKeyCoders);
+      return result;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
index e63c660..40c2627 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
@@ -107,8 +107,8 @@ public class WritableCoder<T extends Writable> extends 
StandardCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  protected CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     result.put("type", type.getName());
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
index 60908fa..c024f89 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
@@ -32,7 +32,7 @@ public abstract class AtomicCoder<T> extends 
DeterministicStandardCoder<T> {
   protected AtomicCoder() { }
 
   @Override
-  public List<Coder<?>> getCoderArguments() {
+  public final List<Coder<?>> getCoderArguments() {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 41afdc6..eee0906 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -327,8 +327,8 @@ public class AvroCoder<T> extends StandardCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  protected CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     addString(result, "type", type.getName());
     addString(result, "schema", schemaSupplier.get().toString());
     return result;

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index 2614cc1..59d29de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -17,17 +17,12 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.beam.sdk.util.Structs.addString;
-import static org.apache.beam.sdk.util.Structs.addStringList;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Lists;
 import java.io.Serializable;
-import java.util.Collection;
 import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.StringUtils;
 
@@ -72,7 +67,7 @@ public abstract class CustomCoder<T> extends AtomicCoder<T>
    * @return A thin {@link CloudObject} wrapping of the Java serialization of 
{@code this}.
    */
   @Override
-  public CloudObject asCloudObject() {
+  public CloudObject initializeCloudObject() {
     // N.B. We use the CustomCoder class, not the derived class, since during
     // deserialization we will be using the CustomCoder's static factory method
     // to construct an instance of the derived class.
@@ -82,17 +77,6 @@ public abstract class CustomCoder<T> extends AtomicCoder<T>
         StringUtils.byteArrayToJsonString(
             SerializableUtils.serializeToByteArray(this)));
 
-    String encodingId = getEncodingId();
-    checkNotNull(encodingId, "Coder.getEncodingId() must not return null.");
-    if (!encodingId.isEmpty()) {
-      addString(result, PropertyNames.ENCODING_ID, encodingId);
-    }
-
-    Collection<String> allowedEncodings = getAllowedEncodings();
-    if (!allowedEncodings.isEmpty()) {
-      addStringList(result, PropertyNames.ALLOWED_ENCODINGS, 
Lists.newArrayList(allowedEncodings));
-    }
-
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index 11fb172..cc6b970 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -68,8 +68,8 @@ public class IterableCoder<T> extends IterableLikeCoder<T, 
Iterable<T>> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  protected CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
index 748b07d..7afd225 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
@@ -167,8 +167,8 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  protected CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     Structs.addString(result, JAXB_CLASS, jaxbClass.getName());
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index c0d3aa4..1e70a30 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -122,8 +122,8 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  protected CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index dd9af32..7319200 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -65,6 +65,8 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> {
     this.valueCoder = valueCoder;
   }
 
+
+
   @Override
   public void encode(T value, OutputStream outStream, Context context)
       throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index 46777b9..de7cea8 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -144,8 +144,8 @@ public class SerializableCoder<T extends Serializable> 
extends AtomicCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  public CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     result.put("type", type.getName());
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
index 0e57ed2..c17c376 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
@@ -120,9 +120,13 @@ public abstract class StandardCoder<T> implements Coder<T> 
{
     return builder.toString();
   }
 
+  /**
+   * {@link StandardCoder} implementations should override {@link 
#initializeCloudObject}
+   * if the default {@link CloudObject} representation needs to change.
+   */
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
+  public final CloudObject asCloudObject() {
+    CloudObject result = initializeCloudObject();
 
     List<? extends Coder<?>> components = getComponents();
     if (!components.isEmpty()) {
@@ -147,6 +151,10 @@ public abstract class StandardCoder<T> implements Coder<T> 
{
     return result;
   }
 
+  protected CloudObject initializeCloudObject() {
+    return CloudObject.forClass(getClass());
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
index 9bba42b..a5f53ff 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
@@ -124,7 +124,7 @@ public class ProtoCoder<T extends Message> extends 
AtomicCoder<T> {
    * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link 
Message}.
    */
   public static <T extends Message> ProtoCoder<T> of(Class<T> 
protoMessageClass) {
-    return new ProtoCoder<T>(protoMessageClass, ImmutableSet.<Class<?>>of());
+    return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of());
   }
 
   /**
@@ -162,7 +162,7 @@ public class ProtoCoder<T extends Message> extends 
AtomicCoder<T> {
       }
     }
 
-    return new ProtoCoder<T>(
+    return new ProtoCoder<>(
         protoMessageClass,
         new ImmutableSet.Builder<Class<?>>()
             .addAll(extensionHostClasses)
@@ -337,8 +337,8 @@ public class ProtoCoder<T extends Message> extends 
AtomicCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  public CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     Structs.addString(result, PROTO_MESSAGE_CLASS, 
protoMessageClass.getName());
     List<CloudObject> extensionHostClassNames = Lists.newArrayList();
     for (String className : getSortedExtensionClasses()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 92c04ca..98a7bec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -27,7 +27,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
@@ -43,6 +42,7 @@ import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -654,11 +654,6 @@ public class Combine {
     }
 
     @Override
-    public List<Coder<?>> getCoderArguments() {
-      return Arrays.<Coder<?>>asList(valueCoder);
-    }
-
-    @Override
     public void encode(Holder<V> accumulator, OutputStream outStream, Context 
context)
         throws CoderException, IOException {
       if (accumulator.present) {
@@ -2225,11 +2220,11 @@ public class Combine {
       }
 
       public static <InputT, AccumT> InputOrAccum<InputT, AccumT> input(InputT 
input) {
-        return new InputOrAccum<InputT, AccumT>(input, null);
+        return new InputOrAccum<>(input, null);
       }
 
       public static <InputT, AccumT> InputOrAccum<InputT, AccumT> accum(AccumT 
aggr) {
-        return new InputOrAccum<InputT, AccumT>(null, aggr);
+        return new InputOrAccum<>(null, aggr);
       }
 
       private static class InputOrAccumCoder<InputT, AccumT>

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 7b849e7..9e0a011 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
@@ -242,20 +241,14 @@ public class CoGbkResult {
       this.unionCoder = unionCoder;
     }
 
-
     @Override
     public List<? extends Coder<?>> getCoderArguments() {
-      return null;
-    }
-
-    @Override
-    public List<? extends Coder<?>> getComponents() {
-      return Arrays.<Coder<?>>asList(unionCoder);
+      return ImmutableList.of(unionCoder);
     }
 
     @Override
-    public CloudObject asCloudObject() {
-      CloudObject result = super.asCloudObject();
+    public CloudObject initializeCloudObject() {
+      CloudObject result = CloudObject.forClass(getClass());
       addObject(result, PropertyNames.CO_GBK_RESULT_SCHEMA, 
schema.asCloudObject());
       return result;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 1b3e648..ce13317 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -131,7 +131,7 @@ public abstract class WindowedValue<T> {
    */
   @Deprecated
   public static <T> WindowedValue<T> valueInEmptyWindows(T value) {
-    return new ValueInEmptyWindows<T>(value, PaneInfo.NO_FIRING);
+    return new ValueInEmptyWindows<>(value, PaneInfo.NO_FIRING);
   }
 
   /**
@@ -143,7 +143,7 @@ public abstract class WindowedValue<T> {
    */
   @Deprecated
   public static <T> WindowedValue<T> valueInEmptyWindows(T value, PaneInfo 
pane) {
-    return new ValueInEmptyWindows<T>(value, pane);
+    return new ValueInEmptyWindows<>(value, pane);
   }
 
   /**
@@ -696,8 +696,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public CloudObject asCloudObject() {
-      CloudObject result = super.asCloudObject();
+    public CloudObject initializeCloudObject() {
+      CloudObject result = CloudObject.forClass(getClass());
       addBoolean(result, PropertyNames.IS_WRAPPER, true);
       return result;
     }
@@ -770,8 +770,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public CloudObject asCloudObject() {
-      CloudObject result = super.asCloudObject();
+    public CloudObject initializeCloudObject() {
+      CloudObject result = CloudObject.forClass(getClass());
       addBoolean(result, PropertyNames.IS_WRAPPER, true);
       return result;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
index 436e227..4c07c83 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
@@ -95,6 +95,11 @@ public class KvCoderTest {
   private static final Coder<KV<String, Integer>> TEST_CODER =
       KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
 
+  @Test
+  public void testEnc() {
+    System.out.println(TEST_CODER.asCloudObject());
+  }
+
   private static final List<KV<String, Integer>> TEST_VALUES =
       Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", 
Integer.MAX_VALUE));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
index 5435a45..9f86ed2 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
@@ -129,8 +129,8 @@ public class SerializableUtilsTest {
     }
 
     @Override
-    public CloudObject asCloudObject() {
-      CloudObject result = super.asCloudObject();
+    public CloudObject initializeCloudObject() {
+      CloudObject result = CloudObject.forClass(getClass());
       result.put("unserializableField", unserializableField);
       return result;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
index 45a8037..7e01846 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
@@ -100,8 +100,8 @@ public class AvroWrapperCoder<WrapperT extends 
AvroWrapper<DatumT>, DatumT>
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  public CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     result.put("wrapperType", wrapperType.getName());
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
index 96ba87a..637e686 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
@@ -101,8 +101,8 @@ public class WritableCoder<T extends Writable> extends 
StandardCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  public CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     result.put("type", type.getName());
     return result;
   }

Reply via email to