[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243741#comment-16243741 ] Robert Metzger commented on FLINK-6022: --- I agree. When I opened the issue, we were sending the avro schema with every Avro GenericRecord over the wire (the schema is a JSON string). The purpose of this JIRA is to put the JSON schema into the TypeInformation and then do the serialization of the GenericRecords based on the schema from the TypeInformation. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243726#comment-16243726 ] Aljoscha Krettek commented on FLINK-6022: - [~rmetzger] opened this but the issue description has nothing to do with your changes, I think. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241747#comment-16241747 ] Stephan Ewen commented on FLINK-6022: - I think this should be resolved, because Generic records and all non-specific records should go through the ReflectDatumReader/Writer. However, would be great if someone from the people that originally opened the ticket could comment on this. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241726#comment-16241726 ] Aljoscha Krettek commented on FLINK-6022: - [~StephanEwen] We should probably open a follow-up issue and mark this one as resolved? Or move to 1.5. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240735#comment-16240735 ] Stephan Ewen commented on FLINK-6022: - With the merged pull request, Avro Specific Records now automatically go through Avro and schema is communicated via the TypeSerializer's parameters. The Avro Serializer also readily handles all non-specific records via the reflect datum readers and writers. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240443#comment-16240443 ] ASF GitHub Bot commented on FLINK-6022: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4943 Btw, now users have to manually depend on `flink-avro` in their projects where before they didn't necessarily have to, right? If yes, we should also put it in the release notes. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240442#comment-16240442 ] ASF GitHub Bot commented on FLINK-6022: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4943 I would suggest adding that last paragraph in the `release notes` field of FLINK-6022 so we don't forget about this. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240333#comment-16240333 ] ASF GitHub Bot commented on FLINK-6022: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4943 I updated this PR with the following proposed solution: 1. Avro is always part of the user code space, and hence will be loaded into the user code classloader. This solves multiple problems: - It allows users to use a different Avro version compared to the version in a spoiled classpath (for example when Hadoop depenencies pull in another Avro version). - It means that Avro class and schema caching is done per user code classloader, not JVM wide. That prevents "X cannot be cast to X" exceptions. 2. By default, a new pure Avro serializer is used for Avro types (in flight and in state). Since Avro class format changed, having compatibility activated by default is not possible anyways. 3. If a user wants to recover an old savepoint that happend to have Avro types serialized as PoJos, they need to do the following; - Set the Avro version to the same version that they had when creating the savepoint. Since Avro now lives in user code space, that is possible without changing/rebuilding Flink. - Pass a Flag to the `AvroTypeInfo` to use a backwards compatible Serializer: `new AvroTypeInfo(myType, true)`. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239808#comment-16239808 ] ASF GitHub Bot commented on FLINK-6022: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4943 Thanks for the review, addressing the comments. I found the problem with the backwards compatibility: It is the Avro version upgrade. Avro types generated with Avro 1.7.7 (Flink 1.3) and serialized as PoJos don't work when read into PoJos generated from the same Schema in Avro 1.8.2 (Flink 1.4). The Avro class structure changed, breaking the Pojo compatibility. Avro is binary format compatible, but not class compatible across versions, it seems. Makes sense, for a serialization format. Now, this brings up a few issues: - Users that use generated Avro types from earlier Avro versions would be forced to upgrade (re-create) these classes with the new Avro version, if they upgrade Flink and it pulls in a newer Avro version. We might solve that by removing avro from `flink-dist` and pushing it into "user code space", where users can dependency manage the Avro version they want and need. - In light of that, actually retaining Avro/Pojo backwards compatibility seems super complicated. I would almost suggest to not do that, because it seems it will not be transparent for users anyways (with Avro itself breaking compatibility). > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239787#comment-16239787 ] ASF GitHub Bot commented on FLINK-6022: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148977803 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot; + +import org.apache.avro.specific.SpecificRecordBase; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if + * it has to ensure compatibility with one of those. + * + * @param The type to be serialized. + */ +public class BackwardsCompatibleAvroSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + /** The type to serialize. */ + private final Class type; + + /** The type serializer currently used. Avro by default. */ + private TypeSerializer serializer; + + /** +* Creates a new backwards-compatible Avro Serializer, for the given type. +*/ + public BackwardsCompatibleAvroSerializer(Class type) { + this.type = type; + this.serializer = new AvroSerializer<>(type); + } + + /** +* Private copy constructor. +*/ + private BackwardsCompatibleAvroSerializer(Class type, TypeSerializer serializer) { + this.type = type; + this.serializer = serializer; + } + + // + // Properties + // + + @Override + public boolean isImmutableType() { + return serializer.isImmutableType(); + } + + @Override + public int getLength() { + return serializer.getLength(); + } + + // + // Serialization + // + + @Override + public T createInstance() { + return serializer.createInstance(); + } + + @Override + public void serialize(T value, DataOutputView target) throws IOException { + serializer.serialize(value, target); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + return serializer.deserialize(source); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + return serializer.deserialize(reuse, source); + } + + //
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239786#comment-16239786 ] ASF GitHub Bot commented on FLINK-6022: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148977780 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot; + +import org.apache.avro.specific.SpecificRecordBase; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if + * it has to ensure compatibility with one of those. + * + * @param The type to be serialized. + */ +public class BackwardsCompatibleAvroSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + /** The type to serialize. */ + private final Class type; + + /** The type serializer currently used. Avro by default. */ + private TypeSerializer serializer; + + /** +* Creates a new backwards-compatible Avro Serializer, for the given type. +*/ + public BackwardsCompatibleAvroSerializer(Class type) { --- End diff -- The other constructor is a copy constructor, would rather not call it... > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239785#comment-16239785 ] ASF GitHub Bot commented on FLINK-6022: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148977758 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return this.reader.read(reuse, this.decoder); } + // + // Copying + // + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader(type); - this.writer = new ReflectDatumWriter(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // + // Compatibility and Upgrades + // + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); +
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238935#comment-16238935 ] ASF GitHub Bot commented on FLINK-6022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148928504 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return this.reader.read(reuse, this.decoder); } + // + // Copying + // + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader(type); - this.writer = new ReflectDatumWriter(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // + // Compatibility and Upgrades + // + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); +
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238831#comment-16238831 ] ASF GitHub Bot commented on FLINK-6022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924339 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -18,118 +18,111 @@ package org.apache.flink.formats.avro.typeutils; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoUtils; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.formats.avro.utils.DataInputDecoder; import org.apache.flink.formats.avro.utils.DataOutputEncoder; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; -import com.esotericsoftware.kryo.Kryo; -import org.apache.avro.generic.GenericData; +import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; +import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType; +import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.util.Utf8; -import org.objenesis.strategy.StdInstantiatorStrategy; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.LinkedHashMap; -import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and - * Kryo for deep object copies. We want to change this to Kryo-only. + * A serializer that serializes types via Avro. * - * @param The type serialized. + * The serializer supports both efficient specific record serialization via for --- End diff -- nit: "via for" > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Priority: Major > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238832#comment-16238832 ] ASF GitHub Bot commented on FLINK-6022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924463 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return this.reader.read(reuse, this.decoder); } + // + // Copying + // + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader(type); - this.writer = new ReflectDatumWriter(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // + // Compatibility and Upgrades + // + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); +
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238829#comment-16238829 ] ASF GitHub Bot commented on FLINK-6022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924503 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot; + +import org.apache.avro.specific.SpecificRecordBase; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if + * it has to ensure compatibility with one of those. + * + * @param The type to be serialized. + */ +public class BackwardsCompatibleAvroSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + /** The type to serialize. */ + private final Class type; + + /** The type serializer currently used. Avro by default. */ + private TypeSerializer serializer; + + /** +* Creates a new backwards-compatible Avro Serializer, for the given type. +*/ + public BackwardsCompatibleAvroSerializer(Class type) { --- End diff -- nit: Call the other constructor? > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Priority: Major > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238830#comment-16238830 ] ASF GitHub Bot commented on FLINK-6022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924447 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return this.reader.read(reuse, this.decoder); } + // + // Copying + // + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader(type); - this.writer = new ReflectDatumWriter(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // + // Compatibility and Upgrades + // + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); +
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238833#comment-16238833 ] ASF GitHub Bot commented on FLINK-6022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924346 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -18,118 +18,111 @@ package org.apache.flink.formats.avro.typeutils; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoUtils; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.formats.avro.utils.DataInputDecoder; import org.apache.flink.formats.avro.utils.DataOutputEncoder; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; -import com.esotericsoftware.kryo.Kryo; -import org.apache.avro.generic.GenericData; +import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; +import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType; +import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.util.Utf8; -import org.objenesis.strategy.StdInstantiatorStrategy; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.LinkedHashMap; -import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and - * Kryo for deep object copies. We want to change this to Kryo-only. + * A serializer that serializes types via Avro. * - * @param The type serialized. + * The serializer supports both efficient specific record serialization via for + * types generated via Avro, as well as serialization via reflection + * (ReflectDatumReader / -Writer). The serializer instantiated the types depending on --- End diff -- nit: "instantiated" => "instantiates" > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Priority: Major > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238834#comment-16238834 ] ASF GitHub Bot commented on FLINK-6022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924545 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot; +import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot; + +import org.apache.avro.specific.SpecificRecordBase; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if + * it has to ensure compatibility with one of those. + * + * @param The type to be serialized. + */ +public class BackwardsCompatibleAvroSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + /** The type to serialize. */ + private final Class type; + + /** The type serializer currently used. Avro by default. */ + private TypeSerializer serializer; + + /** +* Creates a new backwards-compatible Avro Serializer, for the given type. +*/ + public BackwardsCompatibleAvroSerializer(Class type) { + this.type = type; + this.serializer = new AvroSerializer<>(type); + } + + /** +* Private copy constructor. +*/ + private BackwardsCompatibleAvroSerializer(Class type, TypeSerializer serializer) { + this.type = type; + this.serializer = serializer; + } + + // + // Properties + // + + @Override + public boolean isImmutableType() { + return serializer.isImmutableType(); + } + + @Override + public int getLength() { + return serializer.getLength(); + } + + // + // Serialization + // + + @Override + public T createInstance() { + return serializer.createInstance(); + } + + @Override + public void serialize(T value, DataOutputView target) throws IOException { + serializer.serialize(value, target); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + return serializer.deserialize(source); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + return serializer.deserialize(reuse, source); + } + + //
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238287#comment-16238287 ] ASF GitHub Bot commented on FLINK-6022: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4943 [FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State ## What is the purpose of the change This changes Avro types to be serialized with a proper Avro serializer. The Avro serializer efficiently handles both Specific Records (generated by Avro) and Avro-reflection-based serialization. In order to maintain backwards compatibility, Avro type info generates actually a wrapping serializer that falls back to a Pojo (or Kryo) serializer when being reconfigured from an old snapshot. ## Brief change log - Adds a proper Avro type serializers - Adds a backwards-compatible Avro serializer that falls back to Pojo/Kryo on old snapshots - Adds a bunch of test ## Verifying this change - Using Avro specific record types in the program and enjoying nice performant execution ;-) - Using Avro for Flink state and getting it serialized via Avro, allowing a schema upgrade of state - Running the added unit tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no)** - The serializers: **(yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no)** - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink use_proper_avro Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4943.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4943 commit dd05a3bf3471702ac8c9129d2d80f2feeca0f949 Author: Stephan EwenDate: 2017-11-03T13:47:33Z [FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State This falls back to the original serializer (Pojo / Kryo) in cases where an old snapshot is resumed. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Priority: Major > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924180#comment-15924180 ] Robert Metzger commented on FLINK-6022: --- There is actually a way to registering anything serializable with the execution config, the "setGlobalJobParameters(GlobalJobParameters params)". The main use case for that is showing the job parameters in the web frontend (the ParameterTool has support for that as well). Also, the GlobalJobParameters are accessible everywhere in the user code (when using the Rich* variants). Having said all this, I would NOT recommend using the GlobalJobParameters for the Avro serializer. The much more appropriate place for shipping some serialized data (that is specific to a serializer) from the user APIs to the cluster are the TypeInformations. By putting the schema of the generic records into the {{AvroTypeInfo}} (or something similar for GenericAvroRecords), you'll have the schema available on all serializers. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922810#comment-15922810 ] Billy Newport commented on FLINK-6022: -- Thanks for this Robert. Basically to help with the ones we've implemented, we'd need a way of registering our schema objects on the ExecutionConfig and then looking them up on deserialization or a one off call when the ExecutionConfig is inflated would work also. To be honest, we'd just need a way of registering a map of serializable state on the ExecutionConfig. That would be all we would need at least. We are a little different than most I think in that we deal exclusively with GenericRecords with predeclared schemas, no code gened POJOs at all. We've kicked off the internal process of contributing so hopefully myself or Regina Chan (also here) can help contribute to this. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904920#comment-15904920 ] Robert Metzger commented on FLINK-6022: --- I think protobuf always works with generated serializers with a fixed schema (that's the common case for Avro as well), so I don't think there's a need to add support there. For Thrift I don't know. I haven't heard any complaints for Thrift and Protobuf. For Avro this issue has come up with at least two users. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904899#comment-15904899 ] Flavio Pompermaier commented on FLINK-6022: --- HI [~rmetzger], does this apply also to Thrift and Protobuf? Is it the case to open an issue also for them? > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.3.15#6346)