[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r513927391 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { - return convertToSchema(logicalType, "record"); + return convertToSchema(logicalType, "record", true); } /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * * @param logicalType logical type * @param rowName the record name +* @param top whether it is parsing the root record, +*if it is, the logical type nullability would be ignored * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(LogicalType logicalType, String rowName) { + public static Schema convertToSchema( + LogicalType logicalType, + String rowName, + boolean top) { Review comment: @dawidwys I have changed the schema row type to be always nullable false, please take a look again if you have time, thanks so much. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r513370899 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { - return convertToSchema(logicalType, "record"); + return convertToSchema(logicalType, "record", true); } /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * * @param logicalType logical type * @param rowName the record name +* @param top whether it is parsing the root record, +*if it is, the logical type nullability would be ignored * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(LogicalType logicalType, String rowName) { + public static Schema convertToSchema( + LogicalType logicalType, + String rowName, + boolean top) { Review comment: I had offline discussion with @wuchong and @dawidwys , and after some research, we found that a non-nullable row type is more reasonable. But because the change is huge(many codes that convert a type info to data type assumes nullable true before), me and @wuchong decide to change the method signature to `convertToSchema(RowType schema)` and add a notion to the method doc that the passed in `schema` must be the top level record type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r513370899 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { - return convertToSchema(logicalType, "record"); + return convertToSchema(logicalType, "record", true); } /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * * @param logicalType logical type * @param rowName the record name +* @param top whether it is parsing the root record, +*if it is, the logical type nullability would be ignored * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(LogicalType logicalType, String rowName) { + public static Schema convertToSchema( + LogicalType logicalType, + String rowName, + boolean top) { Review comment: I had offline discussion with @wuchong and @dawidwys , and after some research, we found that a non-nullable row type is more reasonable. But because the change is huge(many codes that convert a type info to data type assumes nullable true before), me and @wuchong decide to change the method signature to `convertToSchema(RowType schema)` and add a notion to the method doc that the passed in `schema` must be the top level record type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r513264987 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { - return convertToSchema(logicalType, "record"); + return convertToSchema(logicalType, "record", true); } /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * * @param logicalType logical type * @param rowName the record name +* @param top whether it is parsing the root record, +*if it is, the logical type nullability would be ignored * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(LogicalType logicalType, String rowName) { + public static Schema convertToSchema( + LogicalType logicalType, + String rowName, + boolean top) { Review comment: >Excuse me, but I wholeheartedly disagree with your statement. null =/= (null, null). (null, null) is still NOT NULL. A whole row in a Table can not be null. Only particular columns can be null. Therefore the top level row of a Table is always NOT NULL. You can test it in PostgreSQL with the following SQL: ```sql create type my_type as (a int, b varchar(20)); create table t1( f0 my_type, f1 varchar(20) ); insert into t1 values((null, null), 'def'); select f0 is null from t1; -- it returns true ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r513162229 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,32 +300,53 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { + // If it is parsing the root row type, switches from nullable true to false + // because a nullable row type is meaningless and would generate wrong schema. + if (logicalType.getTypeRoot() == LogicalTypeRoot.ROW Review comment: Although the `AvroSchemaConverter` is a tool class, it is still used as a public API, so i'm inclined to keep the signature unchanged. Another reason is that only logical type is enough for the conversion. Have added more documents to the methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r513135789 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { - return convertToSchema(logicalType, "record"); + return convertToSchema(logicalType, "record", true); } /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * * @param logicalType logical type * @param rowName the record name +* @param top whether it is parsing the root record, +*if it is, the logical type nullability would be ignored * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(LogicalType logicalType, String rowName) { + public static Schema convertToSchema( + LogicalType logicalType, + String rowName, + boolean top) { Review comment: > `In SQL the outer row is always NOT NULL` It is not true, a row (null, null) is nullable true. And i don't think it makes sense to change the planner behavior in general in order to fix a specific use case. >// you can achieve the same with // Schema schema = convertToSchema(type.notNull(), "record") I don't think we should let each invoker to decide whether to make the data type not null, because in current codebase, we should always do that, make the decision everyone is error-prone and hard to maintain. I have merge the `top` into a row type nullability switch, see `AvroSchemaConverter.convertToSchema(LogicalType logicalType)`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r513135789 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { - return convertToSchema(logicalType, "record"); + return convertToSchema(logicalType, "record", true); } /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * * @param logicalType logical type * @param rowName the record name +* @param top whether it is parsing the root record, +*if it is, the logical type nullability would be ignored * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(LogicalType logicalType, String rowName) { + public static Schema convertToSchema( + LogicalType logicalType, + String rowName, + boolean top) { Review comment: > `In SQL the outer row is always NOT NULL` It is not true, a row (null, null) is nullable true. And i don't think it makes sense to change the planner behavior in general in order to fix a specific use case. >// you can achieve the same with >// Schema schema = convertToSchema(type.notNull(), "record") I don't think we should let each invoker to decide whether to make the data type not null, because in current codebase, we should always do that, make the decision everyone is error-prone and hard to maintain. I have merge the `top` into a row type nullability switch, see `AvroSchemaConverter.convertToSchema(LogicalType logicalType)`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r512640835 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { - return convertToSchema(logicalType, "record"); + return convertToSchema(logicalType, "record", true); } /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * * @param logicalType logical type * @param rowName the record name +* @param top whether it is parsing the root record, +*if it is, the logical type nullability would be ignored * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(LogicalType logicalType, String rowName) { + public static Schema convertToSchema( + LogicalType logicalType, + String rowName, + boolean top) { Review comment: Currently, only avro format cares about the outer row nullability, we can switch to change the planner if we found more user cases. Technically to say, it is impossible to infer the outer row nullability only from the DDL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r512618789 ## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java ## @@ -104,48 +104,115 @@ public void testRowTypeAvroSchemaConversion() { DataTypes.FIELD("row3", DataTypes.ROW(DataTypes.FIELD("c", DataTypes.STRING()) .build().toRowDataType().getLogicalType(); Schema schema = AvroSchemaConverter.convertToSchema(rowType); - assertEquals("{\n" + + assertEquals("[ {\n" + Review comment: Thanks, i would add `schema` -> `DataType` -> `schema` test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r512618142 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { - return convertToSchema(logicalType, "record"); + return convertToSchema(logicalType, "record", true); } /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * * @param logicalType logical type * @param rowName the record name +* @param top whether it is parsing the root record, +*if it is, the logical type nullability would be ignored * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(LogicalType logicalType, String rowName) { + public static Schema convertToSchema( + LogicalType logicalType, + String rowName, + boolean top) { Review comment: It is meaningless to have a nullable top row type, and in the `CREATE TABLE` DDL there is no way to specify that the table row is not nullable, actually it should always be not null(even if all the fields are null). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r512616616 ## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java ## @@ -181,10 +181,11 @@ public void testSpecificType() throws Exception { encoder.flush(); byte[] input = byteArrayOutputStream.toByteArray(); + // SE/DE SpecificRecord using the GenericRecord way only supports non-nullable data type. Review comment: Thanks, would remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r512374210 ## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java ## @@ -104,48 +104,115 @@ public void testRowTypeAvroSchemaConversion() { DataTypes.FIELD("row3", DataTypes.ROW(DataTypes.FIELD("c", DataTypes.STRING()) .build().toRowDataType().getLogicalType(); Schema schema = AvroSchemaConverter.convertToSchema(rowType); - assertEquals("{\n" + + assertEquals("[ {\n" + Review comment: Already added, see `AvroSchemaConverterTest.testConversionIntegralityNullable`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r512367627 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -417,4 +431,11 @@ public static LogicalType extractValueTypeToAvroMap(LogicalType type) { } return builder; } + + /** Returns schema with nullable true. */ + private static Schema nullableSchema(Schema schema) { Review comment: They are for different purpose but i think we can use `nullableSchema` altogether. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r511757745 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -362,7 +369,11 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) { .record(rowName) .fields(); for (int i = 0; i < rowType.getFieldCount(); i++) { - String fieldName = rowName + "_" + fieldNames.get(i); + String fieldName = fieldNames.get(i); + if (rowName.equals(fieldName)) { + // Can not build schema when the record and field have the same name + fieldName = rowName + "_" + fieldName; + } Review comment: Yes, the avro schema builder does not allow same name field names, even if they are in different scope (different layer). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org