[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3663


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113439992
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113437414
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113417751
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   }
+   else {
+   throw new RuntimeException("Currently 
we only support schemas of the following form: UNION[null, RECORD]. Given: " + 
schema);
+   }
+   } else if 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113416949
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   }
+   else {
+   throw new RuntimeException("Currently 
we only support schemas of the following form: UNION[null, RECORD]. Given: " + 
schema);
+   }
+   } else if 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113416754
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

Ah, OK. I see. Then let's keep it :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113415863
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

Thanks I added the case for the reverse order. This code is needed for 
nullable records in order to access the schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113207820
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   }
+   return extracted;
+   }
+
+   private static  TypeInformation[] 
createFieldTypes(Class record) {
--- End diff --

This method can be removed 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113236267
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   }
+   else {
+   throw new RuntimeException("Currently 
we only support schemas of the following form: UNION[null, RECORD]. Given: " + 
schema);
+   }
+   } else if 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113161806
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
--- End diff --

Change this to 
```
private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema)
```

and factor out the recursive logic to a method 
```
convertToTypeInfomation(TypeInformation extracted, Schema schema)
``` 
?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241182
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
--- End diff --

We can use a specific record here. We have the class for it.
```
this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, 
schema);
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241221
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
--- End diff --

`GenericRecord` -> `SpecificRecord`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113243664
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241700
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
--- End diff --

Change all `GenericRecord` to `SpecificRecord`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113244628
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

See comment on UNION in deserializer


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113237296
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

This limitation exists because the Table API cannot handle UNION types 
either, right?
Isn't this the same as having a nullable record field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113179472
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   }
+   return extracted;
+   }
+
+   private static  TypeInformation[] 
createFieldTypes(Class record) {
+   final AvroTypeInfo avroTypeInfo = new 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113207375
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   }
+   return extracted;
+   }
+
+   private static  TypeInformation[] 
createFieldTypes(Class record) {
+   final AvroTypeInfo avroTypeInfo = new 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113179453
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   }
+   return extracted;
+   }
+
+   private static  TypeInformation[] 
createFieldTypes(Class record) {
--- End diff --

`record` -> `avroClass`?



[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113244339
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
--- End diff --

Not sure if we should support `UNION` at all. 
If the you have a UNION[NULL, RECORD] field in Avro, you'd expect it to be 
represented also as UNION field in a Table. 
We change it here to a nullable Record field. Not sure if that's expected.

Should 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241294
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
--- End diff --

`GenericRecord` -> `SpecificRecord`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113243381
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113236676
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

are union types always ordered? Could it happen that type `0` is `RECORD` 
and `1` is `NULL`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-03 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/3663

[FLINK-3871] [table] Add Kafka TableSource with Avro serialization

This PR adds KafkaAvroTableSource. It serializes/deserializes (nested) Avro 
records to (nested) Flink rows. Avro Utf8 strings are converted to regular Java 
strings.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-3871

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3663.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3663


commit 589e45c5c50c328783f71d219c6606e972f42f34
Author: twalthr 
Date:   2017-04-03T12:44:46Z

[FLINK-3871] [table] Add Kafka TableSource with Avro serialization




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---