[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema
umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema URL: https://github.com/apache/incubator-hudi/pull/1427#discussion_r399719119 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java ## @@ -168,7 +169,7 @@ public static GenericRecord addHoodieKeyToRecord(GenericRecord record, String re */ public static Schema appendNullSchemaFields(Schema schema, List newFieldNames) { List newFields = schema.getFields().stream() -.map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultValue())).collect(Collectors.toList()); +.map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultVal())).collect(Collectors.toList()); for (String newField : newFieldNames) { newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", NullNode.getInstance())); Review comment: Shall we change this line as well to consistently use the object API ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema
umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema URL: https://github.com/apache/incubator-hudi/pull/1427#discussion_r399722622 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java ## @@ -214,6 +220,63 @@ private static GenericRecord rewrite(GenericRecord record, Schema schemaWithFiel return newRecord; } + /* + This function takes the union of all the fields except hoodie metadata fields + */ + private static List getAllFieldsToWrite(Schema oldSchema, Schema newSchema) { +Set allFields = new HashSet<>(oldSchema.getFields()); +List fields = new ArrayList<>(oldSchema.getFields()); +for (Schema.Field f : newSchema.getFields()) { + if (!allFields.contains(f) && !isMetadataField(f.name())) { +fields.add(f); + } +} + +return fields; + } + + private static void populateNewRecordAsPerDataType(GenericRecord record, Field field) { Review comment: nit: May be rename this to `populateFieldWithDefaultValue` as that seems to be the intent of this function. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema
umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema URL: https://github.com/apache/incubator-hudi/pull/1427#discussion_r399722115 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java ## @@ -214,6 +220,63 @@ private static GenericRecord rewrite(GenericRecord record, Schema schemaWithFiel return newRecord; } + /* + This function takes the union of all the fields except hoodie metadata fields + */ + private static List getAllFieldsToWrite(Schema oldSchema, Schema newSchema) { +Set allFields = new HashSet<>(oldSchema.getFields()); +List fields = new ArrayList<>(oldSchema.getFields()); +for (Schema.Field f : newSchema.getFields()) { + if (!allFields.contains(f) && !isMetadataField(f.name())) { +fields.add(f); + } +} + +return fields; + } + + private static void populateNewRecordAsPerDataType(GenericRecord record, Field field) { +switch (getSchemaTypeForField(field)) { + case STRING: + case BYTES: + case ENUM: + case FIXED: +record.put(field.name(), field.defaultVal() == null ? null : (String) field.defaultVal()); Review comment: Why do we need this casting of individual data types ? It seems we can just pass `field.defaultVal()` as it is because it expects an `Object`, and `field.defaultVal()` returns exactly that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema
umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema URL: https://github.com/apache/incubator-hudi/pull/1427#discussion_r399722305 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java ## @@ -214,6 +220,63 @@ private static GenericRecord rewrite(GenericRecord record, Schema schemaWithFiel return newRecord; } + /* + This function takes the union of all the fields except hoodie metadata fields + */ + private static List getAllFieldsToWrite(Schema oldSchema, Schema newSchema) { +Set allFields = new HashSet<>(oldSchema.getFields()); +List fields = new ArrayList<>(oldSchema.getFields()); +for (Schema.Field f : newSchema.getFields()) { + if (!allFields.contains(f) && !isMetadataField(f.name())) { +fields.add(f); + } +} + +return fields; + } + + private static void populateNewRecordAsPerDataType(GenericRecord record, Field field) { +switch (getSchemaTypeForField(field)) { + case STRING: + case BYTES: + case ENUM: + case FIXED: +record.put(field.name(), field.defaultVal() == null ? null : (String) field.defaultVal()); +break; + case LONG: +record.put(field.name(), field.defaultVal() == null ? null : (long) field.defaultVal()); +break; + case INT: +record.put(field.name(), field.defaultVal() == null ? null : (int) field.defaultVal()); +break; + case FLOAT: +record.put(field.name(), field.defaultVal() == null ? null : (float) field.defaultVal()); +break; + case DOUBLE: +record.put(field.name(), field.defaultVal() == null ? null : (double) field.defaultVal()); +break; + case BOOLEAN: +record.put(field.name(), field.defaultVal() == null ? null : (boolean) field.defaultVal()); +break; + default: +record.put(field.name(), field.defaultVal()); +} + } + + private static Schema.Type getSchemaTypeForField(Field field) { +if (!field.schema().getType().equals(Schema.Type.UNION)) { + return field.schema().getType(); +} + +for (Schema schema : field.schema().getTypes()) { + if (!schema.getType().equals(Schema.Type.NULL)) { +return schema.getType(); + } +} + +return Schema.Type.STRING; Review comment: Shouldn't we return `Schema.Type.NULL` here ? Seems like the only case where we will reach this line is when type is null. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema
umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema URL: https://github.com/apache/incubator-hudi/pull/1427#discussion_r399718998 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java ## @@ -104,15 +103,15 @@ public static Schema addMetadataFields(Schema schema) { List parentFields = new ArrayList<>(); Schema.Field commitTimeField = -new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); +new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", null); Review comment: Minor: we probably should do `(Object) null` to force it to resolve to the new API that accepts object, because null by itself can either refer to JsonNode or an Object This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema
umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema URL: https://github.com/apache/incubator-hudi/pull/1427#discussion_r399721017 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java ## @@ -204,8 +205,13 @@ public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord r private static GenericRecord rewrite(GenericRecord record, Schema schemaWithFields, Schema newSchema) { GenericRecord newRecord = new GenericData.Record(newSchema); -for (Schema.Field f : schemaWithFields.getFields()) { - newRecord.put(f.name(), record.get(f.name())); +//get union of both the schemas, and then populate the fields in the new record +for (Schema.Field f : getAllFieldsToWrite(schemaWithFields, newSchema)) { Review comment: This is an internal function call that is being used by both `rewriteRecordWithOnlyNewSchemaFields` and `rewriteRecord`. `getAllFieldsToWrite` does not really make sense in case of `rewriteRecordWithOnlyNewSchemaFields` and won't really do anything in that case because old and new schema is same. I think it would be better to refactor `rewrite` to receive `List fieldsToWrite` as a parameter instead of `schemaWithFields`. In case of `rewriteRecord` we can call `getAllFieldsToWrite` and pass its value in the parameter, while in case of `rewriteRecordWithOnlyNewSchemaFields` just pass `schema.getFields()` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema
umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema URL: https://github.com/apache/incubator-hudi/pull/1427#discussion_r397699488 ## File path: hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieAvroUtils.java ## @@ -57,4 +60,16 @@ public void testPropsPresent() { } Assert.assertTrue("column pii_col doesn't show up", piiPresent); } + + @Test + public void testDefaultValue() { +GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA)); +rec.put("_row_key", "key1"); +rec.put("non_pii_col", "val1"); +rec.put("pii_col", "val2"); +rec.put("timestamp", 3.5); Review comment: My bad I was thinking only from `DataSource's HoodieSparkSqlWriter` writer point of view, where the schema is determined automatically from the `DataFrame` and converted to avro schema. Missed that `DeltaStreamer` uses the `schema provider` which the users can pass it directly to the `HoodieWriteClient`. Thanks for details ! I have a question for the schema evolution example you provided. The `rewriteRecord()` you are testing here uses the schema from the old record, and re-writes by setting only the fields found in the old schema. So if you rewrite R1 and R2 record, there schema will not have the new `col1` field right ? Hence, your code of populating default values will not get executed because `col1` is not present in the old schema fields. It seems this test case works because you are not evolving the schema here. Your old and new record both have the same schema. But if your old record schema is different I think you will run into the same issue. Am I missing something here ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema
umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema URL: https://github.com/apache/incubator-hudi/pull/1427#discussion_r397037244 ## File path: hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieAvroUtils.java ## @@ -57,4 +60,16 @@ public void testPropsPresent() { } Assert.assertTrue("column pii_col doesn't show up", piiPresent); } + + @Test + public void testDefaultValue() { +GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA)); +rec.put("_row_key", "key1"); +rec.put("non_pii_col", "val1"); +rec.put("pii_col", "val2"); +rec.put("timestamp", 3.5); Review comment: Can you help me understand how you are running into this issue with default values ? Based on my understanding, conversion to avro is internal to Hudi and a custom avro schema (with default values) is not something that user can themselves pass. And how `spark-avro` converts `struct schema to avro` there is no special handling there from `default value` perspective. So I guess I am not sure whether this is an issue in the first place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema
umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema URL: https://github.com/apache/incubator-hudi/pull/1427#discussion_r396877351 ## File path: hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieAvroUtils.java ## @@ -57,4 +60,16 @@ public void testPropsPresent() { } Assert.assertTrue("column pii_col doesn't show up", piiPresent); } + + @Test + public void testDefaultValue() { +GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA)); +rec.put("_row_key", "key1"); +rec.put("non_pii_col", "val1"); +rec.put("pii_col", "val2"); +rec.put("timestamp", 3.5); Review comment: So the issue seems to be that in the original record created in this way, the default values shows up as `null`. Even though you have specified `default: dummy_val` it still is showing up as `null` in the original record. Do you know why that is the case ? When we have specified the default value, why doesn't Avro put it in the record when the field is missing ? I tried using the builder, but that expects default values to be specified for each and every field else throws an excpetion: ``` GenericRecord rec = new GenericRecordBuilder(new Schema.Parser().parse(EXAMPLE_SCHEMA)).build(); ``` Do you have more research points around why this is the case with Avro ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema
umehrot2 commented on a change in pull request #1427: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema URL: https://github.com/apache/incubator-hudi/pull/1427#discussion_r396877767 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java ## @@ -104,15 +103,15 @@ public static Schema addMetadataFields(Schema schema) { List parentFields = new ArrayList<>(); Schema.Field commitTimeField = -new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); +new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", null); Review comment: Are you making these changes to avoid use of deprecated APIs ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services