[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/4943 ---
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148977803 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot; + +import org.apache.avro.specific.SpecificRecordBase; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if + * it has to ensure compatibility with one of those. + * + * @param The type to be serialized. + */ +public class BackwardsCompatibleAvroSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + /** The type to serialize. */ + private final Class type; + + /** The type serializer currently used. Avro by default. */ + private TypeSerializer serializer; + + /** +* Creates a new backwards-compatible Avro Serializer, for the given type. +*/ + public BackwardsCompatibleAvroSerializer(Class type) { + this.type = type; + this.serializer = new AvroSerializer<>(type); + } + + /** +* Private copy constructor. +*/ + private BackwardsCompatibleAvroSerializer(Class type, TypeSerializer serializer) { + this.type = type; + this.serializer = serializer; + } + + // + // Properties + // + + @Override + public boolean isImmutableType() { + return serializer.isImmutableType(); + } + + @Override + public int getLength() { + return serializer.getLength(); + } + + // + // Serialization + // + + @Override + public T createInstance() { + return serializer.createInstance(); + } + + @Override + public void serialize(T value, DataOutputView target) throws IOException { + serializer.serialize(value, target); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + return serializer.deserialize(source); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + return serializer.deserialize(reuse, source); + } + + // + // Copying + // + + @Override + public T copy(T from) { + return serializer.copy(from); + } + + @Override
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148977780 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot; + +import org.apache.avro.specific.SpecificRecordBase; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if + * it has to ensure compatibility with one of those. + * + * @param The type to be serialized. + */ +public class BackwardsCompatibleAvroSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + /** The type to serialize. */ + private final Class type; + + /** The type serializer currently used. Avro by default. */ + private TypeSerializer serializer; + + /** +* Creates a new backwards-compatible Avro Serializer, for the given type. +*/ + public BackwardsCompatibleAvroSerializer(Class type) { --- End diff -- The other constructor is a copy constructor, would rather not call it... ---
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148977758 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return this.reader.read(reuse, this.decoder); } + // + // Copying + // + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader(type); - this.writer = new ReflectDatumWriter(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // + // Compatibility and Upgrades + // + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else { + return CompatibilityResult.requiresMigration(); } } - //
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148928504 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return this.reader.read(reuse, this.decoder); } + // + // Copying + // + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader(type); - this.writer = new ReflectDatumWriter(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // + // Compatibility and Upgrades + // + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else { + return CompatibilityResult.requiresMigration(); } } - //
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924545 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot; + +import org.apache.avro.specific.SpecificRecordBase; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if + * it has to ensure compatibility with one of those. + * + * @param The type to be serialized. + */ +public class BackwardsCompatibleAvroSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + /** The type to serialize. */ + private final Class type; + + /** The type serializer currently used. Avro by default. */ + private TypeSerializer serializer; + + /** +* Creates a new backwards-compatible Avro Serializer, for the given type. +*/ + public BackwardsCompatibleAvroSerializer(Class type) { + this.type = type; + this.serializer = new AvroSerializer<>(type); + } + + /** +* Private copy constructor. +*/ + private BackwardsCompatibleAvroSerializer(Class type, TypeSerializer serializer) { + this.type = type; + this.serializer = serializer; + } + + // + // Properties + // + + @Override + public boolean isImmutableType() { + return serializer.isImmutableType(); + } + + @Override + public int getLength() { + return serializer.getLength(); + } + + // + // Serialization + // + + @Override + public T createInstance() { + return serializer.createInstance(); + } + + @Override + public void serialize(T value, DataOutputView target) throws IOException { + serializer.serialize(value, target); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + return serializer.deserialize(source); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + return serializer.deserialize(reuse, source); + } + + // + // Copying + // + + @Override + public T copy(T from) { + return serializer.copy(from); + } + + @Override
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924463 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return this.reader.read(reuse, this.decoder); } + // + // Copying + // + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader(type); - this.writer = new ReflectDatumWriter(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // + // Compatibility and Upgrades + // + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else { + return CompatibilityResult.requiresMigration(); } } - //
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924503 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot; + +import org.apache.avro.specific.SpecificRecordBase; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if + * it has to ensure compatibility with one of those. + * + * @param The type to be serialized. + */ +public class BackwardsCompatibleAvroSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + /** The type to serialize. */ + private final Class type; + + /** The type serializer currently used. Avro by default. */ + private TypeSerializer serializer; + + /** +* Creates a new backwards-compatible Avro Serializer, for the given type. +*/ + public BackwardsCompatibleAvroSerializer(Class type) { --- End diff -- nit: Call the other constructor? ---
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924447 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return this.reader.read(reuse, this.decoder); } + // + // Copying + // + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader(type); - this.writer = new ReflectDatumWriter(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // + // Compatibility and Upgrades + // + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else { + return CompatibilityResult.requiresMigration(); } } - //
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924339 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -18,118 +18,111 @@ package org.apache.flink.formats.avro.typeutils; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoUtils; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.formats.avro.utils.DataInputDecoder; import org.apache.flink.formats.avro.utils.DataOutputEncoder; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; -import com.esotericsoftware.kryo.Kryo; -import org.apache.avro.generic.GenericData; +import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; +import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType; +import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.util.Utf8; -import org.objenesis.strategy.StdInstantiatorStrategy; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.LinkedHashMap; -import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and - * Kryo for deep object copies. We want to change this to Kryo-only. + * A serializer that serializes types via Avro. * - * @param The type serialized. + * The serializer supports both efficient specific record serialization via for --- End diff -- nit: "via for" ---
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924346 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -18,118 +18,111 @@ package org.apache.flink.formats.avro.typeutils; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoUtils; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.formats.avro.utils.DataInputDecoder; import org.apache.flink.formats.avro.utils.DataOutputEncoder; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; -import com.esotericsoftware.kryo.Kryo; -import org.apache.avro.generic.GenericData; +import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; +import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType; +import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.util.Utf8; -import org.objenesis.strategy.StdInstantiatorStrategy; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.LinkedHashMap; -import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and - * Kryo for deep object copies. We want to change this to Kryo-only. + * A serializer that serializes types via Avro. * - * @param The type serialized. + * The serializer supports both efficient specific record serialization via for + * types generated via Avro, as well as serialization via reflection + * (ReflectDatumReader / -Writer). The serializer instantiated the types depending on --- End diff -- nit: "instantiated" => "instantiates" ---
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4943 [FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State ## What is the purpose of the change This changes Avro types to be serialized with a proper Avro serializer. The Avro serializer efficiently handles both Specific Records (generated by Avro) and Avro-reflection-based serialization. In order to maintain backwards compatibility, Avro type info generates actually a wrapping serializer that falls back to a Pojo (or Kryo) serializer when being reconfigured from an old snapshot. ## Brief change log - Adds a proper Avro type serializers - Adds a backwards-compatible Avro serializer that falls back to Pojo/Kryo on old snapshots - Adds a bunch of test ## Verifying this change - Using Avro specific record types in the program and enjoying nice performant execution ;-) - Using Avro for Flink state and getting it serialized via Avro, allowing a schema upgrade of state - Running the added unit tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no)** - The serializers: **(yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no)** - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink use_proper_avro Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4943.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4943 commit dd05a3bf3471702ac8c9129d2d80f2feeca0f949 Author: Stephan EwenDate: 2017-11-03T13:47:33Z [FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State This falls back to the original serializer (Pojo / Kryo) in cases where an old snapshot is resumed. ---