Repository: incubator-beam Updated Branches: refs/heads/master afa0c31bd -> bfc527d63
Changes in AvroCoder serialization so it can serialize in Kryo Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06c18468 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06c18468 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06c18468 Branch: refs/heads/master Commit: 06c1846860176cc2bd971f8ad7037c97594af866 Parents: afa0c31 Author: Aviem Zur <aviem...@gmail.com> Authored: Thu Sep 8 11:21:41 2016 +0300 Committer: Luke Cwik <lc...@google.com> Committed: Tue Nov 8 07:47:34 2016 -0800 ---------------------------------------------------------------------- sdks/java/core/pom.xml | 7 ++ .../org/apache/beam/sdk/coders/AvroCoder.java | 126 +++++++++++-------- .../apache/beam/sdk/coders/AvroCoderTest.java | 33 +++++ 3 files changed, 112 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 17ef193..c7b46d8 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -473,5 +473,12 @@ <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>2.21</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 7894d14..4f0239e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.Serializable; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; @@ -164,7 +163,9 @@ public class AvroCoder<T> extends StandardCoder<T> { }; private final Class<T> type; - private final transient Schema schema; + private transient Schema schema; + + private final String schemaStr; private final List<String> nonDeterministicReasons; @@ -174,36 +175,16 @@ public class AvroCoder<T> extends StandardCoder<T> { // Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe, // these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use // an inner coder. - private final transient ThreadLocal<BinaryDecoder> decoder; - private final transient ThreadLocal<BinaryEncoder> encoder; - private final transient ThreadLocal<DatumWriter<T>> writer; - private final transient ThreadLocal<DatumReader<T>> reader; + private transient ThreadLocal<BinaryDecoder> memoizedDecoder; + private transient ThreadLocal<BinaryEncoder> memoizedEncoder; + private transient ThreadLocal<DatumWriter<T>> memoizedWriter; + private transient ThreadLocal<DatumReader<T>> memoizedReader; protected AvroCoder(Class<T> type, Schema schema) { this.type = type; this.schema = schema; - + this.schemaStr = schema.toString(); nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema); - - // Decoder and Encoder start off null for each thread. They are allocated and potentially - // reused inside encode/decode. - this.decoder = new ThreadLocal<>(); - this.encoder = new ThreadLocal<>(); - - // Reader and writer are allocated once per thread and are "final" for thread-local Coder - // instance. - this.reader = new ThreadLocal<DatumReader<T>>() { - @Override - public DatumReader<T> initialValue() { - return createDatumReader(); - } - }; - this.writer = new ThreadLocal<DatumWriter<T>>() { - @Override - public DatumWriter<T> initialValue() { - return createDatumWriter(); - } - }; } /** @@ -246,33 +227,29 @@ public class AvroCoder<T> extends StandardCoder<T> { return type; } - private Object writeReplace() { - // When serialized by Java, instances of AvroCoder should be replaced by - // a SerializedAvroCoderProxy. - return new SerializedAvroCoderProxy<>(type, schema.toString()); - } - @Override public void encode(T value, OutputStream outStream, Context context) throws IOException { // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it. + ThreadLocal<BinaryEncoder> encoder = getEncoder(); BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get()); // Save the potentially-new instance for reuse later. encoder.set(encoderInstance); - writer.get().write(value, encoderInstance); + getWriter().get().write(value, encoderInstance); // Direct binary encoder does not buffer any data and need not be flushed. } @Override public T decode(InputStream inStream, Context context) throws IOException { // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it. + ThreadLocal<BinaryDecoder> decoder = getDecoder(); BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get()); // Save the potentially-new instance for later. decoder.set(decoderInstance); - return reader.get().read(null, decoderInstance); + return getReader().get().read(null, decoderInstance); } @Override - public List<? extends Coder<?>> getCoderArguments() { + public List<? extends Coder<?>> getCoderArguments() { return null; } @@ -280,7 +257,7 @@ public class AvroCoder<T> extends StandardCoder<T> { public CloudObject asCloudObject() { CloudObject result = super.asCloudObject(); addString(result, "type", type.getName()); - addString(result, "schema", schema.toString()); + addString(result, "schema", getSchema().toString()); return result; } @@ -306,9 +283,9 @@ public class AvroCoder<T> extends StandardCoder<T> { @Deprecated public DatumReader<T> createDatumReader() { if (type.equals(GenericRecord.class)) { - return new GenericDatumReader<>(schema); + return new GenericDatumReader<>(getSchema()); } else { - return new ReflectDatumReader<>(schema); + return new ReflectDatumReader<>(getSchema()); } } @@ -321,9 +298,9 @@ public class AvroCoder<T> extends StandardCoder<T> { @Deprecated public DatumWriter<T> createDatumWriter() { if (type.equals(GenericRecord.class)) { - return new GenericDatumWriter<>(schema); + return new GenericDatumWriter<>(getSchema()); } else { - return new ReflectDatumWriter<>(schema); + return new ReflectDatumWriter<>(getSchema()); } } @@ -331,28 +308,69 @@ public class AvroCoder<T> extends StandardCoder<T> { * Returns the schema used by this coder. */ public Schema getSchema() { - return schema; + return getMemoizedSchema(); + } + + /** + * Get the memoized {@link BinaryDecoder}, possibly initializing it lazily. + */ + private ThreadLocal<BinaryDecoder> getDecoder() { + if (memoizedDecoder == null) { + memoizedDecoder = new ThreadLocal<>(); + } + return memoizedDecoder; + } + + /** + * Get the memoized {@link BinaryEncoder}, possibly initializing it lazily. + */ + private ThreadLocal<BinaryEncoder> getEncoder() { + if (memoizedEncoder == null) { + memoizedEncoder = new ThreadLocal<>(); + } + return memoizedEncoder; } /** - * Proxy to use in place of serializing the {@link AvroCoder}. This allows the fields - * to remain final. + * Get the memoized {@link DatumReader}, possibly initializing it lazily. */ - private static class SerializedAvroCoderProxy<T> implements Serializable { - private final Class<T> type; - private final String schemaStr; + private ThreadLocal<DatumReader<T>> getReader() { + if (memoizedReader == null) { + memoizedReader = new ThreadLocal<DatumReader<T>>() { + @Override + public DatumReader<T> initialValue() { + return createDatumReader(); + } + }; + } + return memoizedReader; + } - public SerializedAvroCoderProxy(Class<T> type, String schemaStr) { - this.type = type; - this.schemaStr = schemaStr; + /** + * Get the memoized {@link DatumWriter}, possibly initializing it lazily. + */ + private ThreadLocal<DatumWriter<T>> getWriter() { + if (memoizedWriter == null) { + memoizedWriter = new ThreadLocal<DatumWriter<T>>() { + @Override + public DatumWriter<T> initialValue() { + return createDatumWriter(); + } + }; } + return memoizedWriter; + } - private Object readResolve() { - // When deserialized, instances of this object should be replaced by - // constructing an AvroCoder. + /** + * Get the {@link Schema}, possibly initializing it lazily by parsing {@link + * AvroCoder#schemaStr}. + */ + private Schema getMemoizedSchema() { + if (schema == null) { Schema.Parser parser = new Schema.Parser(); - return new AvroCoder<T>(type, parser.parse(schemaStr)); + schema = parser.parse(schemaStr); } + return schema; } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index f6329a0..f2373d1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -39,6 +39,10 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -172,6 +176,35 @@ public class AvroCoderTest { CoderProperties.coderDecodeEncodeEqual(copied, value); } + /** + * Confirm that we can serialize and deserialize an AvroCoder object using Kryo. + * (BEAM-626). + * + * @throws Exception + */ + @Test + public void testKryoSerialization() throws Exception { + Pojo value = new Pojo("Hello", 42); + AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class); + + //Kryo instantiation + Kryo kryo = new Kryo(); + kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy()); + + //Serialization of object + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Output output = new Output(bos); + kryo.writeObject(output, coder); + output.close(); + + //De-serialization of object + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + Input input = new Input(bis); + AvroCoder<Pojo> copied = (AvroCoder<Pojo>) kryo.readObject(input, AvroCoder.class); + + CoderProperties.coderDecodeEncodeEqual(copied, value); + } + @Test public void testPojoEncoding() throws Exception { Pojo value = new Pojo("Hello", 42);