[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r431403417 ## File path: core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AvroDataUnionRecordTest { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + protected void writeAndValidate( + List actualWrite, + List expectedRead, + Schema icebergSchema) throws IOException { +File testFile = temp.newFile(); +Assert.assertTrue("Delete should succeed", testFile.delete()); + +try (FileAppender writer = Avro.write(Files.localOutput(testFile)) +.schema(icebergSchema) +.named("test") +.build()) { + for (GenericData.Record rec : actualWrite) { +writer.add(rec); + } +} + +List rows; +try (AvroIterable reader = Avro.read(Files.localInput(testFile)) +.project(icebergSchema) +.build()) { + rows = Lists.newArrayList(reader); +} + +for (int i = 0; i < expectedRead.size(); i += 1) { + AvroTestHelpers.assertEquals(icebergSchema.asStruct(), expectedRead.get(i), rows.get(i)); +} + } + + @Test + public void testMapOfUnionValues() throws IOException { +String schema1 = "{\n" + +" \"name\": \"MapOfUnion\",\n" + +" \"type\": \"record\",\n" + +" \"fields\": [\n" + +"{\n" + +" \"name\": \"map\",\n" + +" \"type\": [\n" + +"\"null\",\n" + +"{\n" + +" \"type\": \"map\",\n" + +" \"values\": [\n" + +"\"null\",\n" + +"\"boolean\",\n" + +"\"int\",\n" + +"\"long\",\n" + +"\"float\",\n" + +"\"double\",\n" + +"\"bytes\",\n" + +"\"string\"\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}"; +org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema1); +org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); +org.apache.avro.Schema avroSchemaUnionRecord = AvroSchemaUtil.convert(icebergSchema, "test"); +org.apache.avro.Schema unionRecordSchema = + avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1); + +List expectedRead = new ArrayList<>(); +List actualWrite = new ArrayList<>(); + +for (long i = 0; i < 10; i++) { + Map map = new HashMap<>(); + Map mapRead = new HashMap<>(); + updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i); + GenericData.Record recordRead = new GenericRecordBuilder(avroSchema) + .set("map", mapRead) + .build(); + GenericData.Record record = new GenericRecordBuilder(avroSchema) + .set("map", map) + .build(); + actualWrite.add(record); + expectedRead.add(recordRead); +} +writeAndValidate(actualWrite, expectedRead, icebergSchema); + } + + private void updateMapsForUnionSchema( + org.apache.avro.Schema unionRecordSchema, + Map map, + Map mapRead, + Long index) { +map.put("boolean", index % 2 == 0); +map.put("int", index.intValue()); +map.put("long", index); +map.put("float", index.floatValue()); +map.put("double", index.doubleValue()); +map.put("bytes",
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r430121047 ## File path: api/src/main/java/org/apache/iceberg/types/Types.java ## @@ -526,25 +526,25 @@ public static StructType of(List fields) { return new StructType(fields, false); } -public static StructType of(List fields, boolean isUnionSchema) { - return new StructType(fields, isUnionSchema); +public static StructType of(List fields, boolean convertedFromUnionSchema) { + return new StructType(fields, convertedFromUnionSchema); Review comment: there is no other way to pass this information for `GenericAvroWriter` as it needs to know that input generic record has union schema and valuewriters needs to be created accordingly. otherwise we get cast exception. this avoids creating new generic object with record schema ( which is really expensive IMO). what alternative is possible if don't want to add `convertedFromUnionSchema`? ## File path: core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AvroDataUnionRecordTest { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + protected void writeAndValidate( + List actualWrite, + List expectedRead, + Schema icebergSchema) throws IOException { +File testFile = temp.newFile(); +Assert.assertTrue("Delete should succeed", testFile.delete()); + +try (FileAppender writer = Avro.write(Files.localOutput(testFile)) +.schema(icebergSchema) +.named("test") +.build()) { + for (GenericData.Record rec : actualWrite) { +writer.add(rec); + } +} + +List rows; +try (AvroIterable reader = Avro.read(Files.localInput(testFile)) +.project(icebergSchema) +.build()) { + rows = Lists.newArrayList(reader); +} + +for (int i = 0; i < expectedRead.size(); i += 1) { + AvroTestHelpers.assertEquals(icebergSchema.asStruct(), expectedRead.get(i), rows.get(i)); +} + } + + @Test + public void testMapOfUnionValues() throws IOException { +String schema1 = "{\n" + +" \"name\": \"MapOfUnion\",\n" + +" \"type\": \"record\",\n" + +" \"fields\": [\n" + +"{\n" + +" \"name\": \"map\",\n" + +" \"type\": [\n" + +"\"null\",\n" + +"{\n" + +" \"type\": \"map\",\n" + +" \"values\": [\n" + +"\"null\",\n" + +"\"boolean\",\n" + +"\"int\",\n" + +"\"long\",\n" + +"\"float\",\n" + +"\"double\",\n" + +"\"bytes\",\n" + +"\"string\"\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}"; +org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema1); +org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); +org.apache.avro.Schema avroSchemaUnionRecord = AvroSchemaUtil.convert(icebergSchema, "test"); +org.apache.avro.Schema unionRecordSchema = + avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1); + +List expectedRead = new ArrayList<>(); +List actualWrite = new ArrayList<>(); + +for (long i = 0; i < 10; i++) { + Map map =
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429665159 ## File path: core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java ## @@ -112,7 +112,9 @@ public Schema struct(Types.StructType struct, List fieldSchemas) { } recordSchema = Schema.createRecord(recordName, null, null, false, fields); - +if (struct.isUnionSchema()) { Review comment: only way for `GenericAvroWriter` to know if record represents union schema type is to use property set here. otherwise ValueWriters will fail to write union schema record as they will expect new record schema instead of union record schema. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429664731 ## File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java ## @@ -120,7 +124,7 @@ public static boolean isTimestamptz(Schema schema) { } public static boolean isOptionSchema(Schema schema) { -if (schema.getType() == UNION && schema.getTypes().size() == 2) { +if (schema.getType() == UNION && schema.getTypes().size() >= 2) { Review comment: fixed keeping existing behaviour for non union types This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429664072 ## File path: core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AvroDataUnionRecordTest { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + protected void writeAndValidate( + List actualWrite, + List expectedRead, + Schema icebergSchema) throws IOException { +File testFile = temp.newFile(); +Assert.assertTrue("Delete should succeed", testFile.delete()); + +try (FileAppender writer = Avro.write(Files.localOutput(testFile)) +.schema(icebergSchema) +.named("test") +.build()) { + for (GenericData.Record rec : actualWrite) { +writer.add(rec); + } +} + +List rows; +try (AvroIterable reader = Avro.read(Files.localInput(testFile)) +.project(icebergSchema) +.build()) { + rows = Lists.newArrayList(reader); +} + +for (int i = 0; i < expectedRead.size(); i += 1) { + AvroTestHelpers.assertEquals(icebergSchema.asStruct(), expectedRead.get(i), rows.get(i)); +} + } + + @Test + public void testMapOfUnionValues() throws IOException { +String schema1 = "{\n" + +" \"name\": \"MapOfUnion\",\n" + +" \"type\": \"record\",\n" + +" \"fields\": [\n" + +"{\n" + +" \"name\": \"map\",\n" + +" \"type\": [\n" + +"\"null\",\n" + +"{\n" + +" \"type\": \"map\",\n" + +" \"values\": [\n" + +"\"null\",\n" + +"\"boolean\",\n" + +"\"int\",\n" + +"\"long\",\n" + +"\"float\",\n" + +"\"double\",\n" + +"\"bytes\",\n" + +"\"string\"\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}"; +org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema1); +org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); +org.apache.avro.Schema avroSchemaUnionRecord = AvroSchemaUtil.convert(icebergSchema, "test"); +org.apache.avro.Schema unionRecordSchema = + avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1); + +List expectedRead = new ArrayList<>(); +List actualWrite = new ArrayList<>(); + +for (long i = 0; i < 10; i++) { + Map map = new HashMap<>(); + Map mapRead = new HashMap<>(); + updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i); + GenericData.Record recordRead = new GenericRecordBuilder(avroSchema) + .set("map", mapRead) + .build(); + GenericData.Record record = new GenericRecordBuilder(avroSchema) + .set("map", map) + .build(); + actualWrite.add(record); + expectedRead.add(recordRead); +} +writeAndValidate(actualWrite, expectedRead, icebergSchema); + } + + private void updateMapsForUnionSchema( + org.apache.avro.Schema unionRecordSchema, + Map map, + Map mapRead, + Long index) { +map.put("boolean", index % 2 == 0); +map.put("int", index.intValue()); +map.put("long", index); +map.put("float", index.floatValue()); +map.put("double", index.doubleValue()); +map.put("bytes",
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429488334 ## File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java ## @@ -120,7 +124,7 @@ public static boolean isTimestamptz(Schema schema) { } public static boolean isOptionSchema(Schema schema) { -if (schema.getType() == UNION && schema.getTypes().size() == 2) { +if (schema.getType() == UNION && schema.getTypes().size() >= 2) { Review comment: let add handling to return false when its UNION and first element is not null when size > 2 ## File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java ## @@ -120,7 +124,7 @@ public static boolean isTimestamptz(Schema schema) { } public static boolean isOptionSchema(Schema schema) { -if (schema.getType() == UNION && schema.getTypes().size() == 2) { +if (schema.getType() == UNION && schema.getTypes().size() >= 2) { Review comment: let me add handling to return false when its UNION and first element is not null when size > 2 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429487872 ## File path: api/src/main/java/org/apache/iceberg/types/Types.java ## @@ -641,6 +650,13 @@ public int hashCode() { } return fieldsById; } + +/** + * @return true if struct represents union schema + */ +public boolean isUnionSchema() { Review comment: I think this is naming issue. this flag indicate that , field is storing union type by converting it to record/struct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429004177 ## File path: core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AvroDataUnionRecordTest { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + protected void writeAndValidate( + List actualWrite, + List expectedRead, + Schema icebergSchema) throws IOException { +File testFile = temp.newFile(); +Assert.assertTrue("Delete should succeed", testFile.delete()); + +try (FileAppender writer = Avro.write(Files.localOutput(testFile)) +.schema(icebergSchema) +.named("test") +.build()) { + for (GenericData.Record rec : actualWrite) { +writer.add(rec); + } +} + +List rows; +try (AvroIterable reader = Avro.read(Files.localInput(testFile)) +.project(icebergSchema) +.build()) { + rows = Lists.newArrayList(reader); +} + +for (int i = 0; i < expectedRead.size(); i += 1) { + AvroTestHelpers.assertEquals(icebergSchema.asStruct(), expectedRead.get(i), rows.get(i)); +} + } + + @Test + public void testMapOfUnionValues() throws IOException { +String schema1 = "{\n" + +" \"name\": \"MapOfUnion\",\n" + +" \"type\": \"record\",\n" + +" \"fields\": [\n" + +"{\n" + +" \"name\": \"map\",\n" + +" \"type\": [\n" + +"\"null\",\n" + +"{\n" + +" \"type\": \"map\",\n" + +" \"values\": [\n" + +"\"null\",\n" + +"\"boolean\",\n" + +"\"int\",\n" + +"\"long\",\n" + +"\"float\",\n" + +"\"double\",\n" + +"\"bytes\",\n" + +"\"string\"\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}"; +org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema1); +org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); +org.apache.avro.Schema avroSchemaUnionRecord = AvroSchemaUtil.convert(icebergSchema, "test"); +org.apache.avro.Schema unionRecordSchema = + avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1); + +List expectedRead = new ArrayList<>(); +List actualWrite = new ArrayList<>(); + +for (long i = 0; i < 10; i++) { + Map map = new HashMap<>(); + Map mapRead = new HashMap<>(); + updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i); + GenericData.Record recordRead = new GenericRecordBuilder(avroSchema) + .set("map", mapRead) + .build(); + GenericData.Record record = new GenericRecordBuilder(avroSchema) + .set("map", map) + .build(); + actualWrite.add(record); + expectedRead.add(recordRead); +} +writeAndValidate(actualWrite, expectedRead, icebergSchema); + } + + private void updateMapsForUnionSchema( + org.apache.avro.Schema unionRecordSchema, + Map map, + Map mapRead, + Long index) { +map.put("boolean", index % 2 == 0); +map.put("int", index.intValue()); +map.put("long", index); +map.put("float", index.floatValue()); +map.put("double", index.doubleValue()); +map.put("bytes",
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429003645 ## File path: core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java ## @@ -112,7 +112,9 @@ public Schema struct(Types.StructType struct, List fieldSchemas) { } recordSchema = Schema.createRecord(recordName, null, null, false, fields); - +if (struct.isUnionSchema()) { Review comment: for spark , spark avro convert its into record so no changes needed there. I will add test for spark This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429003506 ## File path: core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java ## @@ -112,7 +112,9 @@ public Schema struct(Types.StructType struct, List fieldSchemas) { } recordSchema = Schema.createRecord(recordName, null, null, false, fields); - +if (struct.isUnionSchema()) { Review comment: this is needed when avro writer is java client which is writing directly by consuming avro records. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429003306 ## File path: core/src/main/java/org/apache/iceberg/avro/SchemaToType.java ## @@ -169,18 +184,15 @@ public Type primitive(Schema primitive) { return Types.DecimalType.of( ((LogicalTypes.Decimal) logical).getPrecision(), ((LogicalTypes.Decimal) logical).getScale()); - } else if (logical instanceof LogicalTypes.Date) { return Types.DateType.get(); - } else if ( logical instanceof LogicalTypes.TimeMillis || - logical instanceof LogicalTypes.TimeMicros) { + logical instanceof LogicalTypes.TimeMicros) { Review comment: ack, I did not intentionally do these changes , formatting style from repo did these changes. I suggest same in our internal repo :) , I will revert these changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429003057 ## File path: core/src/main/java/org/apache/iceberg/avro/SchemaToType.java ## @@ -106,11 +106,27 @@ public Type record(Schema record, List names, List fieldTypes) { public Type union(Schema union, List options) { Preconditions.checkArgument(AvroSchemaUtil.isOptionSchema(union), "Unsupported type: non-option union: %s", union); -// records, arrays, and maps will check nullability later -if (options.get(0) == null) { - return options.get(1); -} else { +if (options.size() == 1) { return options.get(0); +} else if (options.size() == 2) { + if (options.get(0) == null) { +return options.get(1); + } else { +return options.get(0); + } +} else { + // Convert complex unions to struct types where field names are member0, member1, etc. + // This is consistent with the behavior of the spark Avro SchemaConverter + List fields = Lists.newArrayListWithExpectedSize(options.size()); + for (int i = 0; i < options.size(); i += 1) { +Type fieldType = options.get(i); +if (fieldType == null) { Review comment: for avro type `null` , fieldType is `null`. this basically ignore null from union This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r429002602 ## File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java ## @@ -47,20 +47,23 @@ private AvroSchemaUtil() {} public static final String VALUE_ID_PROP = "value-id"; public static final String ELEMENT_ID_PROP = "element-id"; public static final String ADJUST_TO_UTC_PROP = "adjust-to-utc"; + public static final String UNION_SCHEMA_TO_RECORD = "union-schema-to-record"; private static final Schema NULL = Schema.create(Schema.Type.NULL); private static final Schema.Type MAP = Schema.Type.MAP; private static final Schema.Type ARRAY = Schema.Type.ARRAY; private static final Schema.Type UNION = Schema.Type.UNION; private static final Schema.Type RECORD = Schema.Type.RECORD; - public static Schema convert(org.apache.iceberg.Schema schema, - String tableName) { + public static Schema convert( + org.apache.iceberg.Schema schema, + String tableName) { Review comment: ack I did not do it intentionally , formatting style from .baseline applied these changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r428785916 ## File path: core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AvroDataUnionRecordTest { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + protected void writeAndValidate( + List expected, + List expectedRead, + Schema icebergSchema) throws IOException { +File testFile = temp.newFile(); +Assert.assertTrue("Delete should succeed", testFile.delete()); + +try (FileAppender writer = Avro.write(Files.localOutput(testFile)) +.schema(icebergSchema) +.named("test") +.build()) { + for (GenericData.Record rec : expected) { +writer.add(rec); + } +} + +List rows; +try (AvroIterable reader = Avro.read(Files.localInput(testFile)) +.project(icebergSchema) +.build()) { + rows = Lists.newArrayList(reader); +} + +for (int i = 0; i < expectedRead.size(); i += 1) { + AvroTestHelpers.assertEquals(icebergSchema.asStruct(), expectedRead.get(i), rows.get(i)); +} + } + + @Test + public void testMapOfUnionValues() throws IOException { +String schema1 = "{\n" + +" \"name\": \"MapOfUnion\",\n" + +" \"type\": \"record\",\n" + +" \"fields\": [\n" + +"{\n" + +" \"name\": \"map\",\n" + +" \"type\": [\n" + +"\"null\",\n" + +"{\n" + +" \"type\": \"map\",\n" + +" \"values\": [\n" + +"\"null\",\n" + +"\"boolean\",\n" + +"\"int\",\n" + +"\"long\",\n" + +"\"float\",\n" + +"\"double\",\n" + +"\"bytes\",\n" + +"\"string\"\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}"; +org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema1); +org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); +org.apache.avro.Schema avroSchemaUnionRecord = AvroSchemaUtil.convert(icebergSchema, "test"); +org.apache.avro.Schema unionRecordSchema = + avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1); + +List expectedRead = new ArrayList<>(); +List expected = new ArrayList<>(); + +for (long i = 0; i < 10; i++) { + Map map = new HashMap<>(); + Map mapRead = new HashMap<>(); + updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i); + GenericData.Record recordRead = new GenericRecordBuilder(avroSchema) + .set("map", mapRead) + .build(); + GenericData.Record record = new GenericRecordBuilder(avroSchema) + .set("map", map) + .build(); + expected.add(record); + expectedRead.add(recordRead); +} +writeAndValidate(expected, expectedRead, icebergSchema); + } + + private void updateMapsForUnionSchema( + org.apache.avro.Schema unionRecordSchema, + Map map, + Map mapRead, + Long index) { +map.put("boolean", index % 2 == 0); +map.put("int", index.intValue()); +map.put("long", index); +map.put("float", index.floatValue()); +map.put("double", index.doubleValue()); +map.put("bytes",
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r428785786 ## File path: core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AvroDataUnionRecordTest { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + protected void writeAndValidate( + List expected, + List expectedRead, + Schema icebergSchema) throws IOException { +File testFile = temp.newFile(); +Assert.assertTrue("Delete should succeed", testFile.delete()); + +try (FileAppender writer = Avro.write(Files.localOutput(testFile)) +.schema(icebergSchema) +.named("test") +.build()) { + for (GenericData.Record rec : expected) { +writer.add(rec); + } +} + +List rows; +try (AvroIterable reader = Avro.read(Files.localInput(testFile)) +.project(icebergSchema) +.build()) { + rows = Lists.newArrayList(reader); +} + +for (int i = 0; i < expectedRead.size(); i += 1) { + AvroTestHelpers.assertEquals(icebergSchema.asStruct(), expectedRead.get(i), rows.get(i)); +} + } + + @Test + public void testMapOfUnionValues() throws IOException { +String schema1 = "{\n" + +" \"name\": \"MapOfUnion\",\n" + +" \"type\": \"record\",\n" + +" \"fields\": [\n" + +"{\n" + +" \"name\": \"map\",\n" + +" \"type\": [\n" + +"\"null\",\n" + +"{\n" + +" \"type\": \"map\",\n" + +" \"values\": [\n" + +"\"null\",\n" + +"\"boolean\",\n" + +"\"int\",\n" + +"\"long\",\n" + +"\"float\",\n" + +"\"double\",\n" + +"\"bytes\",\n" + +"\"string\"\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}\n" + +" ]\n" + +"}"; +org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema1); +org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); +org.apache.avro.Schema avroSchemaUnionRecord = AvroSchemaUtil.convert(icebergSchema, "test"); +org.apache.avro.Schema unionRecordSchema = + avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1); + +List expectedRead = new ArrayList<>(); +List expected = new ArrayList<>(); + +for (long i = 0; i < 10; i++) { + Map map = new HashMap<>(); + Map mapRead = new HashMap<>(); + updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i); + GenericData.Record recordRead = new GenericRecordBuilder(avroSchema) + .set("map", mapRead) + .build(); + GenericData.Record record = new GenericRecordBuilder(avroSchema) + .set("map", map) + .build(); + expected.add(record); + expectedRead.add(recordRead); +} +writeAndValidate(expected, expectedRead, icebergSchema); Review comment: ack This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r428785679 ## File path: api/src/main/java/org/apache/iceberg/types/Types.java ## @@ -641,6 +650,13 @@ public int hashCode() { } return fieldsById; } + +/** + * @return true if struct represents union schema + */ +public boolean isUnionSchema() { Review comment: I will check if we can make it package private, Avro visitor needs it so that it can add property in schema to reflect union type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] sudssf commented on a change in pull request #1046: ISSUE-189: Add support for union record type
sudssf commented on a change in pull request #1046: URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r428785240 ## File path: core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java ## @@ -133,4 +141,38 @@ private WriteBuilder() { } } } + + public static class UnionSchemaWriter implements ValueWriter { Review comment: correct, since iceberg does not support union type but avro does, this approach is similar to spark-avro where union types are converted to record. I dont think it make sense to support union types as it will mean adding support in schema for that which is larger change and has more blast radius. spark does not support union types so it make sense to align with spark and keep scope limited IMO This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org