[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-08 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243741#comment-16243741
 ] 

Robert Metzger commented on FLINK-6022:
---

I agree. When I opened the issue, we were sending the avro schema with every 
Avro GenericRecord over the wire (the schema is a JSON string).
The purpose of this JIRA is to put the JSON schema into the TypeInformation and 
then do the serialization of the GenericRecords based on the schema from the 
TypeInformation.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-08 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243726#comment-16243726
 ] 

Aljoscha Krettek commented on FLINK-6022:
-

[~rmetzger] opened this but the issue description has nothing to do with your 
changes, I think.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241747#comment-16241747
 ] 

Stephan Ewen commented on FLINK-6022:
-

I think this should be resolved, because Generic records and all non-specific 
records should go through the ReflectDatumReader/Writer.

However, would be great if someone from the people that originally opened the 
ticket could comment on this.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-07 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241726#comment-16241726
 ] 

Aljoscha Krettek commented on FLINK-6022:
-

[~StephanEwen] We should probably open a follow-up issue and mark this one as 
resolved? Or move to 1.5.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-06 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240735#comment-16240735
 ] 

Stephan Ewen commented on FLINK-6022:
-

With the merged pull request, Avro Specific Records now automatically go 
through Avro and schema is communicated via the TypeSerializer's parameters.

The Avro Serializer also readily handles all non-specific records via the 
reflect datum readers and writers.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240443#comment-16240443
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4943
  
 

Btw, now users have to manually depend on `flink-avro` in their projects 
where before they didn't necessarily have to, right? If yes, we should also put 
it in the release notes.


> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240442#comment-16240442
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4943
  
I would suggest adding that last paragraph in the `release notes` field of 
FLINK-6022 so we don't forget about this.


> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240333#comment-16240333
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4943
  
I updated this PR with the following proposed solution:

  1. Avro is always part of the user code space, and hence will be loaded 
into the user code classloader. This solves multiple problems:
 - It allows users to use a different Avro version compared to the 
version in a spoiled classpath (for example when Hadoop depenencies pull in 
another Avro version).
 - It means that Avro class and schema caching is done per user code 
classloader, not JVM wide. That prevents "X cannot be cast to X" exceptions.

  2. By default, a new pure Avro serializer is used for Avro types (in 
flight and in state). Since Avro class format changed, having compatibility 
activated by default is not possible anyways.

  3. If a user wants to recover an old savepoint that happend to have Avro 
types serialized as PoJos, they need to do the following;
 - Set the Avro version to the same version that they had when creating 
the savepoint. Since Avro now lives in user code space, that is possible 
without changing/rebuilding Flink.
 - Pass a Flag to the `AvroTypeInfo` to use a backwards compatible 
Serializer: `new AvroTypeInfo(myType, true)`.


> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239808#comment-16239808
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4943
  
Thanks for the review, addressing the comments.

I found the problem with the backwards compatibility: It is the Avro 
version upgrade.
Avro types generated with Avro 1.7.7 (Flink 1.3) and serialized as PoJos 
don't work when read into PoJos generated from the same Schema in Avro 1.8.2 
(Flink 1.4). The Avro class structure changed, breaking the Pojo compatibility.

Avro is binary format compatible, but not class compatible across versions, 
it seems. Makes sense, for a serialization format.

Now, this brings up a few issues:
  - Users that use generated Avro types from earlier Avro versions would be 
forced to upgrade (re-create) these classes with the new Avro version, if they 
upgrade Flink and it pulls in a newer Avro version. We might solve that by 
removing avro from `flink-dist` and pushing it into "user code space", where 
users can dependency manage the Avro version they want and need.

  - In light of that, actually retaining Avro/Pojo backwards compatibility 
seems super complicated. I would almost suggest to not do that, because it 
seems it will not be transparent for users anyways (with Avro itself breaking 
compatibility).


> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239787#comment-16239787
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148977803
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An Avro serializer that can switch back to a KryoSerializer or a Pojo 
Serializer, if
+ * it has to ensure compatibility with one of those.
+ *
+ * @param  The type to be serialized.
+ */
+public class BackwardsCompatibleAvroSerializer extends 
TypeSerializer {
+
+   private static final long serialVersionUID = 1L;
+
+   /** The type to serialize. */
+   private final Class type;
+
+   /** The type serializer currently used. Avro by default. */
+   private TypeSerializer serializer;
+
+   /**
+* Creates a new backwards-compatible Avro Serializer, for the given 
type.
+*/
+   public BackwardsCompatibleAvroSerializer(Class type) {
+   this.type = type;
+   this.serializer = new AvroSerializer<>(type);
+   }
+
+   /**
+* Private copy constructor.
+*/
+   private BackwardsCompatibleAvroSerializer(Class type, 
TypeSerializer serializer) {
+   this.type = type;
+   this.serializer = serializer;
+   }
+
+   // 

+   //  Properties
+   // 

+
+   @Override
+   public boolean isImmutableType() {
+   return serializer.isImmutableType();
+   }
+
+   @Override
+   public int getLength() {
+   return serializer.getLength();
+   }
+
+   // 

+   //  Serialization
+   // 

+
+   @Override
+   public T createInstance() {
+   return serializer.createInstance();
+   }
+
+   @Override
+   public void serialize(T value, DataOutputView target) throws 
IOException {
+   serializer.serialize(value, target);
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   return serializer.deserialize(source);
+   }
+
+   @Override
+   public T deserialize(T reuse, DataInputView source) throws IOException {
+   return serializer.deserialize(reuse, source);
+   }
+
+   // 

  

[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239786#comment-16239786
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148977780
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An Avro serializer that can switch back to a KryoSerializer or a Pojo 
Serializer, if
+ * it has to ensure compatibility with one of those.
+ *
+ * @param  The type to be serialized.
+ */
+public class BackwardsCompatibleAvroSerializer extends 
TypeSerializer {
+
+   private static final long serialVersionUID = 1L;
+
+   /** The type to serialize. */
+   private final Class type;
+
+   /** The type serializer currently used. Avro by default. */
+   private TypeSerializer serializer;
+
+   /**
+* Creates a new backwards-compatible Avro Serializer, for the given 
type.
+*/
+   public BackwardsCompatibleAvroSerializer(Class type) {
--- End diff --

The other constructor is a copy constructor, would rather not call it...


> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239785#comment-16239785
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148977758
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) 
throws IOException {
return this.reader.read(reuse, this.decoder);
}
 
+   // 

+   //  Copying
+   // 

+
@Override
-   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   public T copy(T from) {
checkAvroInitialized();
+   return avroData.deepCopy(schema, from);
+   }
 
-   if (this.deepCopyInstance == null) {
-   this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-   }
-
-   this.decoder.setIn(source);
-   this.encoder.setOut(target);
+   @Override
+   public T copy(T from, T reuse) {
+   return copy(from);
+   }
 
-   T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-   this.writer.write(tmp, this.encoder);
+   @Override
+   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   T value = deserialize(source);
+   serialize(value, target);
}
 
-   private void checkAvroInitialized() {
-   if (this.reader == null) {
-   this.reader = new ReflectDatumReader(type);
-   this.writer = new ReflectDatumWriter(type);
-   this.encoder = new DataOutputEncoder();
-   this.decoder = new DataInputDecoder();
+   // 

+   //  Compatibility and Upgrades
+   // 

+
+   @Override
+   public TypeSerializerConfigSnapshot snapshotConfiguration() {
+   if (configSnapshot == null) {
+   checkAvroInitialized();
+   configSnapshot = new 
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
}
+   return configSnapshot;
}
 
-   private void checkKryoInitialized() {
-   if (this.kryo == null) {
-   this.kryo = new Kryo();
-
-   Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-   
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-   kryo.setInstantiatorStrategy(instantiatorStrategy);
+   @Override
+   @SuppressWarnings("deprecation")
+   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+   if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot) {
+   // proper schema snapshot, can do the sophisticated 
schema-based compatibility check
+   final String schemaString = 
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+   final Schema lastSchema = new 
Schema.Parser().parse(schemaString);
 
-   kryo.setAsmEnabled(true);
+   final SchemaPairCompatibility compatibility =
+   
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
 
-   KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
+   return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
+   // old snapshot case, just compare the type
+   // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
+   // only for object-to-object copies.
+   final AvroSerializerConfigSnapshot old = 
(AvroSerializerConfigSnapshot) configSnapshot;
+   return type.equals(old.getTypeClass()) ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+

[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238935#comment-16238935
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148928504
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) 
throws IOException {
return this.reader.read(reuse, this.decoder);
}
 
+   // 

+   //  Copying
+   // 

+
@Override
-   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   public T copy(T from) {
checkAvroInitialized();
+   return avroData.deepCopy(schema, from);
+   }
 
-   if (this.deepCopyInstance == null) {
-   this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-   }
-
-   this.decoder.setIn(source);
-   this.encoder.setOut(target);
+   @Override
+   public T copy(T from, T reuse) {
+   return copy(from);
+   }
 
-   T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-   this.writer.write(tmp, this.encoder);
+   @Override
+   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   T value = deserialize(source);
+   serialize(value, target);
}
 
-   private void checkAvroInitialized() {
-   if (this.reader == null) {
-   this.reader = new ReflectDatumReader(type);
-   this.writer = new ReflectDatumWriter(type);
-   this.encoder = new DataOutputEncoder();
-   this.decoder = new DataInputDecoder();
+   // 

+   //  Compatibility and Upgrades
+   // 

+
+   @Override
+   public TypeSerializerConfigSnapshot snapshotConfiguration() {
+   if (configSnapshot == null) {
+   checkAvroInitialized();
+   configSnapshot = new 
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
}
+   return configSnapshot;
}
 
-   private void checkKryoInitialized() {
-   if (this.kryo == null) {
-   this.kryo = new Kryo();
-
-   Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-   
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-   kryo.setInstantiatorStrategy(instantiatorStrategy);
+   @Override
+   @SuppressWarnings("deprecation")
+   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+   if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot) {
+   // proper schema snapshot, can do the sophisticated 
schema-based compatibility check
+   final String schemaString = 
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+   final Schema lastSchema = new 
Schema.Parser().parse(schemaString);
 
-   kryo.setAsmEnabled(true);
+   final SchemaPairCompatibility compatibility =
+   
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
 
-   KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
+   return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
+   // old snapshot case, just compare the type
+   // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
+   // only for object-to-object copies.
+   final AvroSerializerConfigSnapshot old = 
(AvroSerializerConfigSnapshot) configSnapshot;
+   return type.equals(old.getTypeClass()) ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   

[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238831#comment-16238831
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924339
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -18,118 +18,111 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
 import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
 import org.apache.flink.formats.avro.utils.DataOutputEncoder;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.avro.generic.GenericData;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
+import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * General purpose serialization. Currently using Apache Avro's 
Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
+ * A serializer that serializes types via Avro.
  *
- * @param  The type serialized.
+ * The serializer supports both efficient specific record serialization 
via for
--- End diff --

nit: "via for"


> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Priority: Major
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238832#comment-16238832
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924463
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) 
throws IOException {
return this.reader.read(reuse, this.decoder);
}
 
+   // 

+   //  Copying
+   // 

+
@Override
-   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   public T copy(T from) {
checkAvroInitialized();
+   return avroData.deepCopy(schema, from);
+   }
 
-   if (this.deepCopyInstance == null) {
-   this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-   }
-
-   this.decoder.setIn(source);
-   this.encoder.setOut(target);
+   @Override
+   public T copy(T from, T reuse) {
+   return copy(from);
+   }
 
-   T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-   this.writer.write(tmp, this.encoder);
+   @Override
+   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   T value = deserialize(source);
+   serialize(value, target);
}
 
-   private void checkAvroInitialized() {
-   if (this.reader == null) {
-   this.reader = new ReflectDatumReader(type);
-   this.writer = new ReflectDatumWriter(type);
-   this.encoder = new DataOutputEncoder();
-   this.decoder = new DataInputDecoder();
+   // 

+   //  Compatibility and Upgrades
+   // 

+
+   @Override
+   public TypeSerializerConfigSnapshot snapshotConfiguration() {
+   if (configSnapshot == null) {
+   checkAvroInitialized();
+   configSnapshot = new 
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
}
+   return configSnapshot;
}
 
-   private void checkKryoInitialized() {
-   if (this.kryo == null) {
-   this.kryo = new Kryo();
-
-   Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-   
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-   kryo.setInstantiatorStrategy(instantiatorStrategy);
+   @Override
+   @SuppressWarnings("deprecation")
+   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+   if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot) {
+   // proper schema snapshot, can do the sophisticated 
schema-based compatibility check
+   final String schemaString = 
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+   final Schema lastSchema = new 
Schema.Parser().parse(schemaString);
 
-   kryo.setAsmEnabled(true);
+   final SchemaPairCompatibility compatibility =
+   
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
 
-   KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
+   return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
+   // old snapshot case, just compare the type
+   // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
+   // only for object-to-object copies.
+   final AvroSerializerConfigSnapshot old = 
(AvroSerializerConfigSnapshot) configSnapshot;
+   return type.equals(old.getTypeClass()) ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   

[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238829#comment-16238829
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924503
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An Avro serializer that can switch back to a KryoSerializer or a Pojo 
Serializer, if
+ * it has to ensure compatibility with one of those.
+ *
+ * @param  The type to be serialized.
+ */
+public class BackwardsCompatibleAvroSerializer extends 
TypeSerializer {
+
+   private static final long serialVersionUID = 1L;
+
+   /** The type to serialize. */
+   private final Class type;
+
+   /** The type serializer currently used. Avro by default. */
+   private TypeSerializer serializer;
+
+   /**
+* Creates a new backwards-compatible Avro Serializer, for the given 
type.
+*/
+   public BackwardsCompatibleAvroSerializer(Class type) {
--- End diff --

nit: Call the other constructor?


> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Priority: Major
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238830#comment-16238830
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924447
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) 
throws IOException {
return this.reader.read(reuse, this.decoder);
}
 
+   // 

+   //  Copying
+   // 

+
@Override
-   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   public T copy(T from) {
checkAvroInitialized();
+   return avroData.deepCopy(schema, from);
+   }
 
-   if (this.deepCopyInstance == null) {
-   this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-   }
-
-   this.decoder.setIn(source);
-   this.encoder.setOut(target);
+   @Override
+   public T copy(T from, T reuse) {
+   return copy(from);
+   }
 
-   T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-   this.writer.write(tmp, this.encoder);
+   @Override
+   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   T value = deserialize(source);
+   serialize(value, target);
}
 
-   private void checkAvroInitialized() {
-   if (this.reader == null) {
-   this.reader = new ReflectDatumReader(type);
-   this.writer = new ReflectDatumWriter(type);
-   this.encoder = new DataOutputEncoder();
-   this.decoder = new DataInputDecoder();
+   // 

+   //  Compatibility and Upgrades
+   // 

+
+   @Override
+   public TypeSerializerConfigSnapshot snapshotConfiguration() {
+   if (configSnapshot == null) {
+   checkAvroInitialized();
+   configSnapshot = new 
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
}
+   return configSnapshot;
}
 
-   private void checkKryoInitialized() {
-   if (this.kryo == null) {
-   this.kryo = new Kryo();
-
-   Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-   
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-   kryo.setInstantiatorStrategy(instantiatorStrategy);
+   @Override
+   @SuppressWarnings("deprecation")
+   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+   if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot) {
+   // proper schema snapshot, can do the sophisticated 
schema-based compatibility check
+   final String schemaString = 
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+   final Schema lastSchema = new 
Schema.Parser().parse(schemaString);
 
-   kryo.setAsmEnabled(true);
+   final SchemaPairCompatibility compatibility =
+   
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
 
-   KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
+   return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
+   // old snapshot case, just compare the type
+   // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
+   // only for object-to-object copies.
+   final AvroSerializerConfigSnapshot old = 
(AvroSerializerConfigSnapshot) configSnapshot;
+   return type.equals(old.getTypeClass()) ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   

[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238833#comment-16238833
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924346
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -18,118 +18,111 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
 import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
 import org.apache.flink.formats.avro.utils.DataOutputEncoder;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.avro.generic.GenericData;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
+import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * General purpose serialization. Currently using Apache Avro's 
Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
+ * A serializer that serializes types via Avro.
  *
- * @param  The type serialized.
+ * The serializer supports both efficient specific record serialization 
via for
+ * types generated via Avro, as well as serialization via reflection
+ * (ReflectDatumReader / -Writer). The serializer instantiated the types 
depending on
--- End diff --

nit: "instantiated" => "instantiates"


> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Priority: Major
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238834#comment-16238834
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924545
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An Avro serializer that can switch back to a KryoSerializer or a Pojo 
Serializer, if
+ * it has to ensure compatibility with one of those.
+ *
+ * @param  The type to be serialized.
+ */
+public class BackwardsCompatibleAvroSerializer extends 
TypeSerializer {
+
+   private static final long serialVersionUID = 1L;
+
+   /** The type to serialize. */
+   private final Class type;
+
+   /** The type serializer currently used. Avro by default. */
+   private TypeSerializer serializer;
+
+   /**
+* Creates a new backwards-compatible Avro Serializer, for the given 
type.
+*/
+   public BackwardsCompatibleAvroSerializer(Class type) {
+   this.type = type;
+   this.serializer = new AvroSerializer<>(type);
+   }
+
+   /**
+* Private copy constructor.
+*/
+   private BackwardsCompatibleAvroSerializer(Class type, 
TypeSerializer serializer) {
+   this.type = type;
+   this.serializer = serializer;
+   }
+
+   // 

+   //  Properties
+   // 

+
+   @Override
+   public boolean isImmutableType() {
+   return serializer.isImmutableType();
+   }
+
+   @Override
+   public int getLength() {
+   return serializer.getLength();
+   }
+
+   // 

+   //  Serialization
+   // 

+
+   @Override
+   public T createInstance() {
+   return serializer.createInstance();
+   }
+
+   @Override
+   public void serialize(T value, DataOutputView target) throws 
IOException {
+   serializer.serialize(value, target);
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   return serializer.deserialize(source);
+   }
+
+   @Override
+   public T deserialize(T reuse, DataInputView source) throws IOException {
+   return serializer.deserialize(reuse, source);
+   }
+
+   // 


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238287#comment-16238287
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/4943

[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State 

## What is the purpose of the change

This changes Avro types to be serialized with a proper Avro serializer. The 
Avro serializer efficiently handles both Specific Records (generated by Avro) 
and Avro-reflection-based serialization.

In order to maintain backwards compatibility, Avro type info generates 
actually a wrapping serializer that falls back to a Pojo (or Kryo) serializer 
when being reconfigured from an old snapshot.

## Brief change log

  - Adds a proper Avro type serializers
  - Adds a backwards-compatible Avro serializer that falls back to 
Pojo/Kryo on old snapshots
  - Adds a bunch of test

## Verifying this change

  -  Using Avro specific record types in the program and enjoying nice 
performant execution ;-)
  - Using Avro for Flink state and getting it serialized via Avro, allowing 
a schema upgrade of state
  - Running the added unit tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no)**
  - The serializers: **(yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no)**
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink use_proper_avro

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4943.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4943


commit dd05a3bf3471702ac8c9129d2d80f2feeca0f949
Author: Stephan Ewen 
Date:   2017-11-03T13:47:33Z

[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State

This falls back to the original serializer (Pojo / Kryo) in cases where
an old snapshot is resumed.




> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Priority: Major
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-03-14 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924180#comment-15924180
 ] 

Robert Metzger commented on FLINK-6022:
---

There is actually a way to registering anything serializable with the execution 
config, the "setGlobalJobParameters(GlobalJobParameters params)". The main use 
case for that is showing the job parameters in the web frontend (the 
ParameterTool has support for that as well).
Also, the GlobalJobParameters are accessible everywhere in the user code (when 
using the Rich* variants).
Having said all this, I would NOT recommend using the GlobalJobParameters for 
the Avro serializer.

The much more appropriate place for shipping some serialized data (that is 
specific to a serializer) from the user APIs to the cluster are the 
TypeInformations.

By putting the schema of the generic records into the {{AvroTypeInfo}} (or 
something similar for GenericAvroRecords), you'll have the schema available on 
all serializers.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-03-13 Thread Billy Newport (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922810#comment-15922810
 ] 

Billy Newport commented on FLINK-6022:
--

Thanks for this Robert. Basically to help with the ones we've implemented, we'd 
need a way of registering our schema objects on the ExecutionConfig and then 
looking them up on deserialization or a one off call when the ExecutionConfig 
is inflated would work also. To be honest, we'd just need a way of registering 
a map of serializable state on the ExecutionConfig. That would be all we would 
need at least.

We are a little different than most I think in that we deal exclusively with 
GenericRecords with predeclared schemas, no code gened POJOs at all. We've 
kicked off the internal process of contributing so hopefully myself or Regina 
Chan (also here) can help contribute to this.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-03-10 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904920#comment-15904920
 ] 

Robert Metzger commented on FLINK-6022:
---

I think protobuf always works with generated serializers with a fixed schema 
(that's the common case for Avro as well), so I don't think there's a need to 
add support there.
For Thrift I don't know.

I haven't heard any complaints for Thrift and Protobuf. For Avro this issue has 
come up with at least two users.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-03-10 Thread Flavio Pompermaier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904899#comment-15904899
 ] 

Flavio Pompermaier commented on FLINK-6022:
---

HI [~rmetzger], does this apply also to Thrift and Protobuf?
Is it the case to open an issue also for them?

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)