[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-09 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/4943


---


[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-05 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148977803
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An Avro serializer that can switch back to a KryoSerializer or a Pojo 
Serializer, if
+ * it has to ensure compatibility with one of those.
+ *
+ * @param  The type to be serialized.
+ */
+public class BackwardsCompatibleAvroSerializer extends 
TypeSerializer {
+
+   private static final long serialVersionUID = 1L;
+
+   /** The type to serialize. */
+   private final Class type;
+
+   /** The type serializer currently used. Avro by default. */
+   private TypeSerializer serializer;
+
+   /**
+* Creates a new backwards-compatible Avro Serializer, for the given 
type.
+*/
+   public BackwardsCompatibleAvroSerializer(Class type) {
+   this.type = type;
+   this.serializer = new AvroSerializer<>(type);
+   }
+
+   /**
+* Private copy constructor.
+*/
+   private BackwardsCompatibleAvroSerializer(Class type, 
TypeSerializer serializer) {
+   this.type = type;
+   this.serializer = serializer;
+   }
+
+   // 

+   //  Properties
+   // 

+
+   @Override
+   public boolean isImmutableType() {
+   return serializer.isImmutableType();
+   }
+
+   @Override
+   public int getLength() {
+   return serializer.getLength();
+   }
+
+   // 

+   //  Serialization
+   // 

+
+   @Override
+   public T createInstance() {
+   return serializer.createInstance();
+   }
+
+   @Override
+   public void serialize(T value, DataOutputView target) throws 
IOException {
+   serializer.serialize(value, target);
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   return serializer.deserialize(source);
+   }
+
+   @Override
+   public T deserialize(T reuse, DataInputView source) throws IOException {
+   return serializer.deserialize(reuse, source);
+   }
+
+   // 

+   //  Copying
+   // 

+
+   @Override
+   public T copy(T from) {
+   return serializer.copy(from);
+   }
+
+   @Override

[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-05 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148977780
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An Avro serializer that can switch back to a KryoSerializer or a Pojo 
Serializer, if
+ * it has to ensure compatibility with one of those.
+ *
+ * @param  The type to be serialized.
+ */
+public class BackwardsCompatibleAvroSerializer extends 
TypeSerializer {
+
+   private static final long serialVersionUID = 1L;
+
+   /** The type to serialize. */
+   private final Class type;
+
+   /** The type serializer currently used. Avro by default. */
+   private TypeSerializer serializer;
+
+   /**
+* Creates a new backwards-compatible Avro Serializer, for the given 
type.
+*/
+   public BackwardsCompatibleAvroSerializer(Class type) {
--- End diff --

The other constructor is a copy constructor, would rather not call it...


---


[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-05 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148977758
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) 
throws IOException {
return this.reader.read(reuse, this.decoder);
}
 
+   // 

+   //  Copying
+   // 

+
@Override
-   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   public T copy(T from) {
checkAvroInitialized();
+   return avroData.deepCopy(schema, from);
+   }
 
-   if (this.deepCopyInstance == null) {
-   this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-   }
-
-   this.decoder.setIn(source);
-   this.encoder.setOut(target);
+   @Override
+   public T copy(T from, T reuse) {
+   return copy(from);
+   }
 
-   T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-   this.writer.write(tmp, this.encoder);
+   @Override
+   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   T value = deserialize(source);
+   serialize(value, target);
}
 
-   private void checkAvroInitialized() {
-   if (this.reader == null) {
-   this.reader = new ReflectDatumReader(type);
-   this.writer = new ReflectDatumWriter(type);
-   this.encoder = new DataOutputEncoder();
-   this.decoder = new DataInputDecoder();
+   // 

+   //  Compatibility and Upgrades
+   // 

+
+   @Override
+   public TypeSerializerConfigSnapshot snapshotConfiguration() {
+   if (configSnapshot == null) {
+   checkAvroInitialized();
+   configSnapshot = new 
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
}
+   return configSnapshot;
}
 
-   private void checkKryoInitialized() {
-   if (this.kryo == null) {
-   this.kryo = new Kryo();
-
-   Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-   
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-   kryo.setInstantiatorStrategy(instantiatorStrategy);
+   @Override
+   @SuppressWarnings("deprecation")
+   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+   if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot) {
+   // proper schema snapshot, can do the sophisticated 
schema-based compatibility check
+   final String schemaString = 
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+   final Schema lastSchema = new 
Schema.Parser().parse(schemaString);
 
-   kryo.setAsmEnabled(true);
+   final SchemaPairCompatibility compatibility =
+   
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
 
-   KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
+   return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
+   // old snapshot case, just compare the type
+   // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
+   // only for object-to-object copies.
+   final AvroSerializerConfigSnapshot old = 
(AvroSerializerConfigSnapshot) configSnapshot;
+   return type.equals(old.getTypeClass()) ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else {
+   return CompatibilityResult.requiresMigration();
}
}
 
-   // 


[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-04 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148928504
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) 
throws IOException {
return this.reader.read(reuse, this.decoder);
}
 
+   // 

+   //  Copying
+   // 

+
@Override
-   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   public T copy(T from) {
checkAvroInitialized();
+   return avroData.deepCopy(schema, from);
+   }
 
-   if (this.deepCopyInstance == null) {
-   this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-   }
-
-   this.decoder.setIn(source);
-   this.encoder.setOut(target);
+   @Override
+   public T copy(T from, T reuse) {
+   return copy(from);
+   }
 
-   T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-   this.writer.write(tmp, this.encoder);
+   @Override
+   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   T value = deserialize(source);
+   serialize(value, target);
}
 
-   private void checkAvroInitialized() {
-   if (this.reader == null) {
-   this.reader = new ReflectDatumReader(type);
-   this.writer = new ReflectDatumWriter(type);
-   this.encoder = new DataOutputEncoder();
-   this.decoder = new DataInputDecoder();
+   // 

+   //  Compatibility and Upgrades
+   // 

+
+   @Override
+   public TypeSerializerConfigSnapshot snapshotConfiguration() {
+   if (configSnapshot == null) {
+   checkAvroInitialized();
+   configSnapshot = new 
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
}
+   return configSnapshot;
}
 
-   private void checkKryoInitialized() {
-   if (this.kryo == null) {
-   this.kryo = new Kryo();
-
-   Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-   
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-   kryo.setInstantiatorStrategy(instantiatorStrategy);
+   @Override
+   @SuppressWarnings("deprecation")
+   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+   if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot) {
+   // proper schema snapshot, can do the sophisticated 
schema-based compatibility check
+   final String schemaString = 
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+   final Schema lastSchema = new 
Schema.Parser().parse(schemaString);
 
-   kryo.setAsmEnabled(true);
+   final SchemaPairCompatibility compatibility =
+   
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
 
-   KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
+   return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
+   // old snapshot case, just compare the type
+   // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
+   // only for object-to-object copies.
+   final AvroSerializerConfigSnapshot old = 
(AvroSerializerConfigSnapshot) configSnapshot;
+   return type.equals(old.getTypeClass()) ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else {
+   return CompatibilityResult.requiresMigration();
}
}
 
-   // 

   

[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-04 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924545
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An Avro serializer that can switch back to a KryoSerializer or a Pojo 
Serializer, if
+ * it has to ensure compatibility with one of those.
+ *
+ * @param  The type to be serialized.
+ */
+public class BackwardsCompatibleAvroSerializer extends 
TypeSerializer {
+
+   private static final long serialVersionUID = 1L;
+
+   /** The type to serialize. */
+   private final Class type;
+
+   /** The type serializer currently used. Avro by default. */
+   private TypeSerializer serializer;
+
+   /**
+* Creates a new backwards-compatible Avro Serializer, for the given 
type.
+*/
+   public BackwardsCompatibleAvroSerializer(Class type) {
+   this.type = type;
+   this.serializer = new AvroSerializer<>(type);
+   }
+
+   /**
+* Private copy constructor.
+*/
+   private BackwardsCompatibleAvroSerializer(Class type, 
TypeSerializer serializer) {
+   this.type = type;
+   this.serializer = serializer;
+   }
+
+   // 

+   //  Properties
+   // 

+
+   @Override
+   public boolean isImmutableType() {
+   return serializer.isImmutableType();
+   }
+
+   @Override
+   public int getLength() {
+   return serializer.getLength();
+   }
+
+   // 

+   //  Serialization
+   // 

+
+   @Override
+   public T createInstance() {
+   return serializer.createInstance();
+   }
+
+   @Override
+   public void serialize(T value, DataOutputView target) throws 
IOException {
+   serializer.serialize(value, target);
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   return serializer.deserialize(source);
+   }
+
+   @Override
+   public T deserialize(T reuse, DataInputView source) throws IOException {
+   return serializer.deserialize(reuse, source);
+   }
+
+   // 

+   //  Copying
+   // 

+
+   @Override
+   public T copy(T from) {
+   return serializer.copy(from);
+   }
+
+   @Override
   

[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-04 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924463
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) 
throws IOException {
return this.reader.read(reuse, this.decoder);
}
 
+   // 

+   //  Copying
+   // 

+
@Override
-   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   public T copy(T from) {
checkAvroInitialized();
+   return avroData.deepCopy(schema, from);
+   }
 
-   if (this.deepCopyInstance == null) {
-   this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-   }
-
-   this.decoder.setIn(source);
-   this.encoder.setOut(target);
+   @Override
+   public T copy(T from, T reuse) {
+   return copy(from);
+   }
 
-   T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-   this.writer.write(tmp, this.encoder);
+   @Override
+   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   T value = deserialize(source);
+   serialize(value, target);
}
 
-   private void checkAvroInitialized() {
-   if (this.reader == null) {
-   this.reader = new ReflectDatumReader(type);
-   this.writer = new ReflectDatumWriter(type);
-   this.encoder = new DataOutputEncoder();
-   this.decoder = new DataInputDecoder();
+   // 

+   //  Compatibility and Upgrades
+   // 

+
+   @Override
+   public TypeSerializerConfigSnapshot snapshotConfiguration() {
+   if (configSnapshot == null) {
+   checkAvroInitialized();
+   configSnapshot = new 
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
}
+   return configSnapshot;
}
 
-   private void checkKryoInitialized() {
-   if (this.kryo == null) {
-   this.kryo = new Kryo();
-
-   Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-   
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-   kryo.setInstantiatorStrategy(instantiatorStrategy);
+   @Override
+   @SuppressWarnings("deprecation")
+   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+   if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot) {
+   // proper schema snapshot, can do the sophisticated 
schema-based compatibility check
+   final String schemaString = 
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+   final Schema lastSchema = new 
Schema.Parser().parse(schemaString);
 
-   kryo.setAsmEnabled(true);
+   final SchemaPairCompatibility compatibility =
+   
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
 
-   KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
+   return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
+   // old snapshot case, just compare the type
+   // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
+   // only for object-to-object copies.
+   final AvroSerializerConfigSnapshot old = 
(AvroSerializerConfigSnapshot) configSnapshot;
+   return type.equals(old.getTypeClass()) ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else {
+   return CompatibilityResult.requiresMigration();
}
}
 
-   // 

   

[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-04 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924503
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An Avro serializer that can switch back to a KryoSerializer or a Pojo 
Serializer, if
+ * it has to ensure compatibility with one of those.
+ *
+ * @param  The type to be serialized.
+ */
+public class BackwardsCompatibleAvroSerializer extends 
TypeSerializer {
+
+   private static final long serialVersionUID = 1L;
+
+   /** The type to serialize. */
+   private final Class type;
+
+   /** The type serializer currently used. Avro by default. */
+   private TypeSerializer serializer;
+
+   /**
+* Creates a new backwards-compatible Avro Serializer, for the given 
type.
+*/
+   public BackwardsCompatibleAvroSerializer(Class type) {
--- End diff --

nit: Call the other constructor?


---


[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-04 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924447
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) 
throws IOException {
return this.reader.read(reuse, this.decoder);
}
 
+   // 

+   //  Copying
+   // 

+
@Override
-   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   public T copy(T from) {
checkAvroInitialized();
+   return avroData.deepCopy(schema, from);
+   }
 
-   if (this.deepCopyInstance == null) {
-   this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-   }
-
-   this.decoder.setIn(source);
-   this.encoder.setOut(target);
+   @Override
+   public T copy(T from, T reuse) {
+   return copy(from);
+   }
 
-   T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-   this.writer.write(tmp, this.encoder);
+   @Override
+   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   T value = deserialize(source);
+   serialize(value, target);
}
 
-   private void checkAvroInitialized() {
-   if (this.reader == null) {
-   this.reader = new ReflectDatumReader(type);
-   this.writer = new ReflectDatumWriter(type);
-   this.encoder = new DataOutputEncoder();
-   this.decoder = new DataInputDecoder();
+   // 

+   //  Compatibility and Upgrades
+   // 

+
+   @Override
+   public TypeSerializerConfigSnapshot snapshotConfiguration() {
+   if (configSnapshot == null) {
+   checkAvroInitialized();
+   configSnapshot = new 
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
}
+   return configSnapshot;
}
 
-   private void checkKryoInitialized() {
-   if (this.kryo == null) {
-   this.kryo = new Kryo();
-
-   Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-   
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-   kryo.setInstantiatorStrategy(instantiatorStrategy);
+   @Override
+   @SuppressWarnings("deprecation")
+   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+   if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot) {
+   // proper schema snapshot, can do the sophisticated 
schema-based compatibility check
+   final String schemaString = 
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+   final Schema lastSchema = new 
Schema.Parser().parse(schemaString);
 
-   kryo.setAsmEnabled(true);
+   final SchemaPairCompatibility compatibility =
+   
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
 
-   KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
+   return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
+   // old snapshot case, just compare the type
+   // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
+   // only for object-to-object copies.
+   final AvroSerializerConfigSnapshot old = 
(AvroSerializerConfigSnapshot) configSnapshot;
+   return type.equals(old.getTypeClass()) ?
+   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+   }
+   else {
+   return CompatibilityResult.requiresMigration();
}
}
 
-   // 

   

[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-04 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924339
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -18,118 +18,111 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
 import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
 import org.apache.flink.formats.avro.utils.DataOutputEncoder;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.avro.generic.GenericData;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
+import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * General purpose serialization. Currently using Apache Avro's 
Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
+ * A serializer that serializes types via Avro.
  *
- * @param  The type serialized.
+ * The serializer supports both efficient specific record serialization 
via for
--- End diff --

nit: "via for"


---


[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-04 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4943#discussion_r148924346
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -18,118 +18,111 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
 import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
 import org.apache.flink.formats.avro.utils.DataOutputEncoder;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.avro.generic.GenericData;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
+import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * General purpose serialization. Currently using Apache Avro's 
Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
+ * A serializer that serializes types via Avro.
  *
- * @param  The type serialized.
+ * The serializer supports both efficient specific record serialization 
via for
+ * types generated via Avro, as well as serialization via reflection
+ * (ReflectDatumReader / -Writer). The serializer instantiated the types 
depending on
--- End diff --

nit: "instantiated" => "instantiates"


---


[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-03 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/4943

[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State 

## What is the purpose of the change

This changes Avro types to be serialized with a proper Avro serializer. The 
Avro serializer efficiently handles both Specific Records (generated by Avro) 
and Avro-reflection-based serialization.

In order to maintain backwards compatibility, Avro type info generates 
actually a wrapping serializer that falls back to a Pojo (or Kryo) serializer 
when being reconfigured from an old snapshot.

## Brief change log

  - Adds a proper Avro type serializers
  - Adds a backwards-compatible Avro serializer that falls back to 
Pojo/Kryo on old snapshots
  - Adds a bunch of test

## Verifying this change

  -  Using Avro specific record types in the program and enjoying nice 
performant execution ;-)
  - Using Avro for Flink state and getting it serialized via Avro, allowing 
a schema upgrade of state
  - Running the added unit tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no)**
  - The serializers: **(yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no)**
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink use_proper_avro

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4943.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4943


commit dd05a3bf3471702ac8c9129d2d80f2feeca0f949
Author: Stephan Ewen 
Date:   2017-11-03T13:47:33Z

[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State

This falls back to the original serializer (Pojo / Kryo) in cases where
an old snapshot is resumed.




---