[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-27 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r431403417



##
File path: 
core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AvroDataUnionRecordTest {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  protected void writeAndValidate(
+  List actualWrite,
+  List expectedRead,
+  Schema icebergSchema) throws IOException {
+File testFile = temp.newFile();
+Assert.assertTrue("Delete should succeed", testFile.delete());
+
+try (FileAppender writer = 
Avro.write(Files.localOutput(testFile))
+.schema(icebergSchema)
+.named("test")
+.build()) {
+  for (GenericData.Record rec : actualWrite) {
+writer.add(rec);
+  }
+}
+
+List rows;
+try (AvroIterable reader = 
Avro.read(Files.localInput(testFile))
+.project(icebergSchema)
+.build()) {
+  rows = Lists.newArrayList(reader);
+}
+
+for (int i = 0; i < expectedRead.size(); i += 1) {
+  AvroTestHelpers.assertEquals(icebergSchema.asStruct(), 
expectedRead.get(i), rows.get(i));
+}
+  }
+
+  @Test
+  public void testMapOfUnionValues() throws IOException {
+String schema1 = "{\n" +
+"  \"name\": \"MapOfUnion\",\n" +
+"  \"type\": \"record\",\n" +
+"  \"fields\": [\n" +
+"{\n" +
+"  \"name\": \"map\",\n" +
+"  \"type\": [\n" +
+"\"null\",\n" +
+"{\n" +
+"  \"type\": \"map\",\n" +
+"  \"values\": [\n" +
+"\"null\",\n" +
+"\"boolean\",\n" +
+"\"int\",\n" +
+"\"long\",\n" +
+"\"float\",\n" +
+"\"double\",\n" +
+"\"bytes\",\n" +
+"\"string\"\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}";
+org.apache.avro.Schema avroSchema = new 
org.apache.avro.Schema.Parser().parse(schema1);
+org.apache.iceberg.Schema icebergSchema = 
AvroSchemaUtil.toIceberg(avroSchema);
+org.apache.avro.Schema avroSchemaUnionRecord = 
AvroSchemaUtil.convert(icebergSchema, "test");
+org.apache.avro.Schema unionRecordSchema =
+
avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1);
+
+List expectedRead = new ArrayList<>();
+List actualWrite = new ArrayList<>();
+
+for (long i = 0; i < 10; i++) {
+  Map map = new HashMap<>();
+  Map mapRead = new HashMap<>();
+  updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i);
+  GenericData.Record recordRead = new GenericRecordBuilder(avroSchema)
+  .set("map", mapRead)
+  .build();
+  GenericData.Record record = new GenericRecordBuilder(avroSchema)
+  .set("map", map)
+  .build();
+  actualWrite.add(record);
+  expectedRead.add(recordRead);
+}
+writeAndValidate(actualWrite, expectedRead, icebergSchema);
+  }
+
+  private void updateMapsForUnionSchema(
+  org.apache.avro.Schema unionRecordSchema,
+  Map map,
+  Map mapRead,
+  Long index) {
+map.put("boolean", index % 2 == 0);
+map.put("int", index.intValue());
+map.put("long", index);
+map.put("float", index.floatValue());
+map.put("double", index.doubleValue());
+map.put("bytes", 

[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-26 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r430121047



##
File path: api/src/main/java/org/apache/iceberg/types/Types.java
##
@@ -526,25 +526,25 @@ public static StructType of(List fields) {
   return new StructType(fields, false);
 }
 
-public static StructType of(List fields, boolean 
isUnionSchema) {
-  return new StructType(fields, isUnionSchema);
+public static StructType of(List fields, boolean 
convertedFromUnionSchema) {
+  return new StructType(fields, convertedFromUnionSchema);

Review comment:
   there is no other way to pass this information for `GenericAvroWriter` 
as it needs to know that input generic record has union schema and valuewriters 
needs to be created accordingly. otherwise we get cast exception. this avoids 
creating new generic object with record schema ( which is really expensive 
IMO).  what alternative is possible if don't want to add 
`convertedFromUnionSchema`?

##
File path: 
core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AvroDataUnionRecordTest {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  protected void writeAndValidate(
+  List actualWrite,
+  List expectedRead,
+  Schema icebergSchema) throws IOException {
+File testFile = temp.newFile();
+Assert.assertTrue("Delete should succeed", testFile.delete());
+
+try (FileAppender writer = 
Avro.write(Files.localOutput(testFile))
+.schema(icebergSchema)
+.named("test")
+.build()) {
+  for (GenericData.Record rec : actualWrite) {
+writer.add(rec);
+  }
+}
+
+List rows;
+try (AvroIterable reader = 
Avro.read(Files.localInput(testFile))
+.project(icebergSchema)
+.build()) {
+  rows = Lists.newArrayList(reader);
+}
+
+for (int i = 0; i < expectedRead.size(); i += 1) {
+  AvroTestHelpers.assertEquals(icebergSchema.asStruct(), 
expectedRead.get(i), rows.get(i));
+}
+  }
+
+  @Test
+  public void testMapOfUnionValues() throws IOException {
+String schema1 = "{\n" +
+"  \"name\": \"MapOfUnion\",\n" +
+"  \"type\": \"record\",\n" +
+"  \"fields\": [\n" +
+"{\n" +
+"  \"name\": \"map\",\n" +
+"  \"type\": [\n" +
+"\"null\",\n" +
+"{\n" +
+"  \"type\": \"map\",\n" +
+"  \"values\": [\n" +
+"\"null\",\n" +
+"\"boolean\",\n" +
+"\"int\",\n" +
+"\"long\",\n" +
+"\"float\",\n" +
+"\"double\",\n" +
+"\"bytes\",\n" +
+"\"string\"\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}";
+org.apache.avro.Schema avroSchema = new 
org.apache.avro.Schema.Parser().parse(schema1);
+org.apache.iceberg.Schema icebergSchema = 
AvroSchemaUtil.toIceberg(avroSchema);
+org.apache.avro.Schema avroSchemaUnionRecord = 
AvroSchemaUtil.convert(icebergSchema, "test");
+org.apache.avro.Schema unionRecordSchema =
+
avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1);
+
+List expectedRead = new ArrayList<>();
+List actualWrite = new ArrayList<>();
+
+for (long i = 0; i < 10; i++) {
+  Map map = 

[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-24 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429665159



##
File path: core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java
##
@@ -112,7 +112,9 @@ public Schema struct(Types.StructType struct, List 
fieldSchemas) {
 }
 
 recordSchema = Schema.createRecord(recordName, null, null, false, fields);
-
+if (struct.isUnionSchema()) {

Review comment:
   only way for `GenericAvroWriter` to know if record represents union 
schema type is to use property set here. otherwise ValueWriters will fail to 
write union schema record as they will expect new record schema instead of 
union record schema.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-24 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429664731



##
File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
##
@@ -120,7 +124,7 @@ public static boolean isTimestamptz(Schema schema) {
   }
 
   public static boolean isOptionSchema(Schema schema) {
-if (schema.getType() == UNION && schema.getTypes().size() == 2) {
+if (schema.getType() == UNION && schema.getTypes().size() >= 2) {

Review comment:
   fixed keeping existing behaviour for non union types





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-24 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429664072



##
File path: 
core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AvroDataUnionRecordTest {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  protected void writeAndValidate(
+  List actualWrite,
+  List expectedRead,
+  Schema icebergSchema) throws IOException {
+File testFile = temp.newFile();
+Assert.assertTrue("Delete should succeed", testFile.delete());
+
+try (FileAppender writer = 
Avro.write(Files.localOutput(testFile))
+.schema(icebergSchema)
+.named("test")
+.build()) {
+  for (GenericData.Record rec : actualWrite) {
+writer.add(rec);
+  }
+}
+
+List rows;
+try (AvroIterable reader = 
Avro.read(Files.localInput(testFile))
+.project(icebergSchema)
+.build()) {
+  rows = Lists.newArrayList(reader);
+}
+
+for (int i = 0; i < expectedRead.size(); i += 1) {
+  AvroTestHelpers.assertEquals(icebergSchema.asStruct(), 
expectedRead.get(i), rows.get(i));
+}
+  }
+
+  @Test
+  public void testMapOfUnionValues() throws IOException {
+String schema1 = "{\n" +
+"  \"name\": \"MapOfUnion\",\n" +
+"  \"type\": \"record\",\n" +
+"  \"fields\": [\n" +
+"{\n" +
+"  \"name\": \"map\",\n" +
+"  \"type\": [\n" +
+"\"null\",\n" +
+"{\n" +
+"  \"type\": \"map\",\n" +
+"  \"values\": [\n" +
+"\"null\",\n" +
+"\"boolean\",\n" +
+"\"int\",\n" +
+"\"long\",\n" +
+"\"float\",\n" +
+"\"double\",\n" +
+"\"bytes\",\n" +
+"\"string\"\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}";
+org.apache.avro.Schema avroSchema = new 
org.apache.avro.Schema.Parser().parse(schema1);
+org.apache.iceberg.Schema icebergSchema = 
AvroSchemaUtil.toIceberg(avroSchema);
+org.apache.avro.Schema avroSchemaUnionRecord = 
AvroSchemaUtil.convert(icebergSchema, "test");
+org.apache.avro.Schema unionRecordSchema =
+
avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1);
+
+List expectedRead = new ArrayList<>();
+List actualWrite = new ArrayList<>();
+
+for (long i = 0; i < 10; i++) {
+  Map map = new HashMap<>();
+  Map mapRead = new HashMap<>();
+  updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i);
+  GenericData.Record recordRead = new GenericRecordBuilder(avroSchema)
+  .set("map", mapRead)
+  .build();
+  GenericData.Record record = new GenericRecordBuilder(avroSchema)
+  .set("map", map)
+  .build();
+  actualWrite.add(record);
+  expectedRead.add(recordRead);
+}
+writeAndValidate(actualWrite, expectedRead, icebergSchema);
+  }
+
+  private void updateMapsForUnionSchema(
+  org.apache.avro.Schema unionRecordSchema,
+  Map map,
+  Map mapRead,
+  Long index) {
+map.put("boolean", index % 2 == 0);
+map.put("int", index.intValue());
+map.put("long", index);
+map.put("float", index.floatValue());
+map.put("double", index.doubleValue());
+map.put("bytes", 

[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-22 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429488334



##
File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
##
@@ -120,7 +124,7 @@ public static boolean isTimestamptz(Schema schema) {
   }
 
   public static boolean isOptionSchema(Schema schema) {
-if (schema.getType() == UNION && schema.getTypes().size() == 2) {
+if (schema.getType() == UNION && schema.getTypes().size() >= 2) {

Review comment:
   let add handling to return false when its UNION and first element is not 
null when size > 2

##
File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
##
@@ -120,7 +124,7 @@ public static boolean isTimestamptz(Schema schema) {
   }
 
   public static boolean isOptionSchema(Schema schema) {
-if (schema.getType() == UNION && schema.getTypes().size() == 2) {
+if (schema.getType() == UNION && schema.getTypes().size() >= 2) {

Review comment:
   let me add handling to return false when its UNION and first element is 
not null when size > 2





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-22 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429487872



##
File path: api/src/main/java/org/apache/iceberg/types/Types.java
##
@@ -641,6 +650,13 @@ public int hashCode() {
   }
   return fieldsById;
 }
+
+/**
+ * @return true if struct represents union schema
+ */
+public boolean isUnionSchema() {

Review comment:
   I think this is naming issue. this flag indicate that , field is storing 
union type by converting it to record/struct.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-21 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429004177



##
File path: 
core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AvroDataUnionRecordTest {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  protected void writeAndValidate(
+  List actualWrite,
+  List expectedRead,
+  Schema icebergSchema) throws IOException {
+File testFile = temp.newFile();
+Assert.assertTrue("Delete should succeed", testFile.delete());
+
+try (FileAppender writer = 
Avro.write(Files.localOutput(testFile))
+.schema(icebergSchema)
+.named("test")
+.build()) {
+  for (GenericData.Record rec : actualWrite) {
+writer.add(rec);
+  }
+}
+
+List rows;
+try (AvroIterable reader = 
Avro.read(Files.localInput(testFile))
+.project(icebergSchema)
+.build()) {
+  rows = Lists.newArrayList(reader);
+}
+
+for (int i = 0; i < expectedRead.size(); i += 1) {
+  AvroTestHelpers.assertEquals(icebergSchema.asStruct(), 
expectedRead.get(i), rows.get(i));
+}
+  }
+
+  @Test
+  public void testMapOfUnionValues() throws IOException {
+String schema1 = "{\n" +
+"  \"name\": \"MapOfUnion\",\n" +
+"  \"type\": \"record\",\n" +
+"  \"fields\": [\n" +
+"{\n" +
+"  \"name\": \"map\",\n" +
+"  \"type\": [\n" +
+"\"null\",\n" +
+"{\n" +
+"  \"type\": \"map\",\n" +
+"  \"values\": [\n" +
+"\"null\",\n" +
+"\"boolean\",\n" +
+"\"int\",\n" +
+"\"long\",\n" +
+"\"float\",\n" +
+"\"double\",\n" +
+"\"bytes\",\n" +
+"\"string\"\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}";
+org.apache.avro.Schema avroSchema = new 
org.apache.avro.Schema.Parser().parse(schema1);
+org.apache.iceberg.Schema icebergSchema = 
AvroSchemaUtil.toIceberg(avroSchema);
+org.apache.avro.Schema avroSchemaUnionRecord = 
AvroSchemaUtil.convert(icebergSchema, "test");
+org.apache.avro.Schema unionRecordSchema =
+
avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1);
+
+List expectedRead = new ArrayList<>();
+List actualWrite = new ArrayList<>();
+
+for (long i = 0; i < 10; i++) {
+  Map map = new HashMap<>();
+  Map mapRead = new HashMap<>();
+  updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i);
+  GenericData.Record recordRead = new GenericRecordBuilder(avroSchema)
+  .set("map", mapRead)
+  .build();
+  GenericData.Record record = new GenericRecordBuilder(avroSchema)
+  .set("map", map)
+  .build();
+  actualWrite.add(record);
+  expectedRead.add(recordRead);
+}
+writeAndValidate(actualWrite, expectedRead, icebergSchema);
+  }
+
+  private void updateMapsForUnionSchema(
+  org.apache.avro.Schema unionRecordSchema,
+  Map map,
+  Map mapRead,
+  Long index) {
+map.put("boolean", index % 2 == 0);
+map.put("int", index.intValue());
+map.put("long", index);
+map.put("float", index.floatValue());
+map.put("double", index.doubleValue());
+map.put("bytes", 

[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-21 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429003645



##
File path: core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java
##
@@ -112,7 +112,9 @@ public Schema struct(Types.StructType struct, List 
fieldSchemas) {
 }
 
 recordSchema = Schema.createRecord(recordName, null, null, false, fields);
-
+if (struct.isUnionSchema()) {

Review comment:
   for spark , spark avro convert its into record so no changes needed 
there. I will add test for spark





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-21 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429003506



##
File path: core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java
##
@@ -112,7 +112,9 @@ public Schema struct(Types.StructType struct, List 
fieldSchemas) {
 }
 
 recordSchema = Schema.createRecord(recordName, null, null, false, fields);
-
+if (struct.isUnionSchema()) {

Review comment:
   this is needed when avro writer is java client which is writing directly 
by consuming avro records.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-21 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429003306



##
File path: core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
##
@@ -169,18 +184,15 @@ public Type primitive(Schema primitive) {
 return Types.DecimalType.of(
 ((LogicalTypes.Decimal) logical).getPrecision(),
 ((LogicalTypes.Decimal) logical).getScale());
-
   } else if (logical instanceof LogicalTypes.Date) {
 return Types.DateType.get();
-
   } else if (
   logical instanceof LogicalTypes.TimeMillis ||
-  logical instanceof LogicalTypes.TimeMicros) {
+  logical instanceof LogicalTypes.TimeMicros) {

Review comment:
   ack, I did not intentionally do these changes , formatting style from 
repo did these changes. I suggest same in our internal repo :) , I will revert 
these changes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-21 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429003057



##
File path: core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
##
@@ -106,11 +106,27 @@ public Type record(Schema record, List names, 
List fieldTypes) {
   public Type union(Schema union, List options) {
 Preconditions.checkArgument(AvroSchemaUtil.isOptionSchema(union),
 "Unsupported type: non-option union: %s", union);
-// records, arrays, and maps will check nullability later
-if (options.get(0) == null) {
-  return options.get(1);
-} else {
+if (options.size() == 1) {
   return options.get(0);
+} else if (options.size() == 2) {
+  if (options.get(0) == null) {
+return options.get(1);
+  } else {
+return options.get(0);
+  }
+} else {
+  // Convert complex unions to struct types where field names are member0, 
member1, etc.
+  // This is consistent with the behavior of the spark Avro SchemaConverter
+  List fields = 
Lists.newArrayListWithExpectedSize(options.size());
+  for (int i = 0; i < options.size(); i += 1) {
+Type fieldType = options.get(i);
+if (fieldType == null) {

Review comment:
   for avro type `null` , fieldType is `null`. this basically ignore null 
from union





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-21 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429002602



##
File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
##
@@ -47,20 +47,23 @@ private AvroSchemaUtil() {}
   public static final String VALUE_ID_PROP = "value-id";
   public static final String ELEMENT_ID_PROP = "element-id";
   public static final String ADJUST_TO_UTC_PROP = "adjust-to-utc";
+  public static final String UNION_SCHEMA_TO_RECORD = "union-schema-to-record";
 
   private static final Schema NULL = Schema.create(Schema.Type.NULL);
   private static final Schema.Type MAP = Schema.Type.MAP;
   private static final Schema.Type ARRAY = Schema.Type.ARRAY;
   private static final Schema.Type UNION = Schema.Type.UNION;
   private static final Schema.Type RECORD = Schema.Type.RECORD;
 
-  public static Schema convert(org.apache.iceberg.Schema schema,
-   String tableName) {
+  public static Schema convert(
+  org.apache.iceberg.Schema schema,
+  String tableName) {

Review comment:
   ack I did not do it intentionally , formatting style from .baseline 
applied these changes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-21 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r428785916



##
File path: 
core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AvroDataUnionRecordTest {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  protected void writeAndValidate(
+  List expected,
+  List expectedRead,
+  Schema icebergSchema) throws IOException {
+File testFile = temp.newFile();
+Assert.assertTrue("Delete should succeed", testFile.delete());
+
+try (FileAppender writer = 
Avro.write(Files.localOutput(testFile))
+.schema(icebergSchema)
+.named("test")
+.build()) {
+  for (GenericData.Record rec : expected) {
+writer.add(rec);
+  }
+}
+
+List rows;
+try (AvroIterable reader = 
Avro.read(Files.localInput(testFile))
+.project(icebergSchema)
+.build()) {
+  rows = Lists.newArrayList(reader);
+}
+
+for (int i = 0; i < expectedRead.size(); i += 1) {
+  AvroTestHelpers.assertEquals(icebergSchema.asStruct(), 
expectedRead.get(i), rows.get(i));
+}
+  }
+
+  @Test
+  public void testMapOfUnionValues() throws IOException {
+String schema1 = "{\n" +
+"  \"name\": \"MapOfUnion\",\n" +
+"  \"type\": \"record\",\n" +
+"  \"fields\": [\n" +
+"{\n" +
+"  \"name\": \"map\",\n" +
+"  \"type\": [\n" +
+"\"null\",\n" +
+"{\n" +
+"  \"type\": \"map\",\n" +
+"  \"values\": [\n" +
+"\"null\",\n" +
+"\"boolean\",\n" +
+"\"int\",\n" +
+"\"long\",\n" +
+"\"float\",\n" +
+"\"double\",\n" +
+"\"bytes\",\n" +
+"\"string\"\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}";
+org.apache.avro.Schema avroSchema = new 
org.apache.avro.Schema.Parser().parse(schema1);
+org.apache.iceberg.Schema icebergSchema = 
AvroSchemaUtil.toIceberg(avroSchema);
+org.apache.avro.Schema avroSchemaUnionRecord = 
AvroSchemaUtil.convert(icebergSchema, "test");
+org.apache.avro.Schema unionRecordSchema =
+
avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1);
+
+List expectedRead = new ArrayList<>();
+List expected = new ArrayList<>();
+
+for (long i = 0; i < 10; i++) {
+  Map map = new HashMap<>();
+  Map mapRead = new HashMap<>();
+  updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i);
+  GenericData.Record recordRead = new GenericRecordBuilder(avroSchema)
+  .set("map", mapRead)
+  .build();
+  GenericData.Record record = new GenericRecordBuilder(avroSchema)
+  .set("map", map)
+  .build();
+  expected.add(record);
+  expectedRead.add(recordRead);
+}
+writeAndValidate(expected, expectedRead, icebergSchema);
+  }
+
+  private void updateMapsForUnionSchema(
+  org.apache.avro.Schema unionRecordSchema,
+  Map map,
+  Map mapRead,
+  Long index) {
+map.put("boolean", index % 2 == 0);
+map.put("int", index.intValue());
+map.put("long", index);
+map.put("float", index.floatValue());
+map.put("double", index.doubleValue());
+map.put("bytes", 

[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-21 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r428785786



##
File path: 
core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AvroDataUnionRecordTest {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  protected void writeAndValidate(
+  List expected,
+  List expectedRead,
+  Schema icebergSchema) throws IOException {
+File testFile = temp.newFile();
+Assert.assertTrue("Delete should succeed", testFile.delete());
+
+try (FileAppender writer = 
Avro.write(Files.localOutput(testFile))
+.schema(icebergSchema)
+.named("test")
+.build()) {
+  for (GenericData.Record rec : expected) {
+writer.add(rec);
+  }
+}
+
+List rows;
+try (AvroIterable reader = 
Avro.read(Files.localInput(testFile))
+.project(icebergSchema)
+.build()) {
+  rows = Lists.newArrayList(reader);
+}
+
+for (int i = 0; i < expectedRead.size(); i += 1) {
+  AvroTestHelpers.assertEquals(icebergSchema.asStruct(), 
expectedRead.get(i), rows.get(i));
+}
+  }
+
+  @Test
+  public void testMapOfUnionValues() throws IOException {
+String schema1 = "{\n" +
+"  \"name\": \"MapOfUnion\",\n" +
+"  \"type\": \"record\",\n" +
+"  \"fields\": [\n" +
+"{\n" +
+"  \"name\": \"map\",\n" +
+"  \"type\": [\n" +
+"\"null\",\n" +
+"{\n" +
+"  \"type\": \"map\",\n" +
+"  \"values\": [\n" +
+"\"null\",\n" +
+"\"boolean\",\n" +
+"\"int\",\n" +
+"\"long\",\n" +
+"\"float\",\n" +
+"\"double\",\n" +
+"\"bytes\",\n" +
+"\"string\"\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}\n" +
+"  ]\n" +
+"}";
+org.apache.avro.Schema avroSchema = new 
org.apache.avro.Schema.Parser().parse(schema1);
+org.apache.iceberg.Schema icebergSchema = 
AvroSchemaUtil.toIceberg(avroSchema);
+org.apache.avro.Schema avroSchemaUnionRecord = 
AvroSchemaUtil.convert(icebergSchema, "test");
+org.apache.avro.Schema unionRecordSchema =
+
avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1);
+
+List expectedRead = new ArrayList<>();
+List expected = new ArrayList<>();
+
+for (long i = 0; i < 10; i++) {
+  Map map = new HashMap<>();
+  Map mapRead = new HashMap<>();
+  updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i);
+  GenericData.Record recordRead = new GenericRecordBuilder(avroSchema)
+  .set("map", mapRead)
+  .build();
+  GenericData.Record record = new GenericRecordBuilder(avroSchema)
+  .set("map", map)
+  .build();
+  expected.add(record);
+  expectedRead.add(recordRead);
+}
+writeAndValidate(expected, expectedRead, icebergSchema);

Review comment:
   ack





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-21 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r428785679



##
File path: api/src/main/java/org/apache/iceberg/types/Types.java
##
@@ -641,6 +650,13 @@ public int hashCode() {
   }
   return fieldsById;
 }
+
+/**
+ * @return true if struct represents union schema
+ */
+public boolean isUnionSchema() {

Review comment:
   I will check if we can make it package private, Avro visitor needs it so 
that it can add property in schema to reflect union type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type

2020-05-21 Thread GitBox


sudssf commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r428785240



##
File path: core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java
##
@@ -133,4 +141,38 @@ private WriteBuilder() {
   }
 }
   }
+
+  public static class UnionSchemaWriter implements 
ValueWriter {

Review comment:
   correct, 
   since iceberg does not support union type but avro does, this approach is 
similar to spark-avro where union types are converted to record.
   I dont think it make sense to support union types as it will mean adding 
support in schema for that which is larger change and has more blast radius.
   spark does not support union types so it make sense to align with spark and 
keep scope limited IMO





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org