[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-28 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r513927391



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##
@@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) {
 * @return Avro's {@link Schema} matching this logical type.
 */
public static Schema convertToSchema(LogicalType logicalType) {
-   return convertToSchema(logicalType, "record");
+   return convertToSchema(logicalType, "record", true);
}
 
/**
 * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
 *
 * @param logicalType logical type
 * @param rowName the record name
+* @param top whether it is parsing the root record,
+*if it is, the logical type nullability would be 
ignored
 * @return Avro's {@link Schema} matching this logical type.
 */
-   public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
+   public static Schema convertToSchema(
+   LogicalType logicalType,
+   String rowName,
+   boolean top) {

Review comment:
   @dawidwys  I have changed the schema row type to be always nullable 
false, please take a look again if you have time, thanks so much.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-28 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r513370899



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##
@@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) {
 * @return Avro's {@link Schema} matching this logical type.
 */
public static Schema convertToSchema(LogicalType logicalType) {
-   return convertToSchema(logicalType, "record");
+   return convertToSchema(logicalType, "record", true);
}
 
/**
 * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
 *
 * @param logicalType logical type
 * @param rowName the record name
+* @param top whether it is parsing the root record,
+*if it is, the logical type nullability would be 
ignored
 * @return Avro's {@link Schema} matching this logical type.
 */
-   public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
+   public static Schema convertToSchema(
+   LogicalType logicalType,
+   String rowName,
+   boolean top) {

Review comment:
   I had offline discussion with @wuchong and @dawidwys , and after some 
research, we found that a non-nullable row type is more reasonable.
   
   But because the change is huge(many codes that convert a type info to data 
type assumes nullable true before), me and @wuchong decide to change the method 
signature to `convertToSchema(RowType schema)` and add a notion to the method 
doc that the passed in `schema` must be the top level record type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-28 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r513370899



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##
@@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) {
 * @return Avro's {@link Schema} matching this logical type.
 */
public static Schema convertToSchema(LogicalType logicalType) {
-   return convertToSchema(logicalType, "record");
+   return convertToSchema(logicalType, "record", true);
}
 
/**
 * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
 *
 * @param logicalType logical type
 * @param rowName the record name
+* @param top whether it is parsing the root record,
+*if it is, the logical type nullability would be 
ignored
 * @return Avro's {@link Schema} matching this logical type.
 */
-   public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
+   public static Schema convertToSchema(
+   LogicalType logicalType,
+   String rowName,
+   boolean top) {

Review comment:
   I had offline discussion with @wuchong and @dawidwys , and after some 
research, we found that a non-nullable row type is more reasonable.
   
   But because the change is huge(many codes that convert a type info to data 
type assumes nullable true before), me and @wuchong decide to change the method 
signature to `convertToSchema(RowType schema)` and add a notion to the method 
doc that the passed in `schema` must be the top level record type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-28 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r513264987



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##
@@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) {
 * @return Avro's {@link Schema} matching this logical type.
 */
public static Schema convertToSchema(LogicalType logicalType) {
-   return convertToSchema(logicalType, "record");
+   return convertToSchema(logicalType, "record", true);
}
 
/**
 * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
 *
 * @param logicalType logical type
 * @param rowName the record name
+* @param top whether it is parsing the root record,
+*if it is, the logical type nullability would be 
ignored
 * @return Avro's {@link Schema} matching this logical type.
 */
-   public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
+   public static Schema convertToSchema(
+   LogicalType logicalType,
+   String rowName,
+   boolean top) {

Review comment:
   >Excuse me, but I wholeheartedly disagree with your statement. null =/= 
(null, null). (null, null) is still NOT NULL. A whole row in a Table can not be 
null. Only particular columns can be null. Therefore the top level row of a 
Table is always NOT NULL.
   
   You can test it in PostgreSQL with the following SQL:
   ```sql
   create type my_type as (a int, b varchar(20));
   
   create table t1(
 f0 my_type,
 f1 varchar(20)
   );
   
   insert into t1 values((null, null), 'def');
   
   select f0 is null from t1; -- it returns true
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-27 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r513162229



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##
@@ -297,32 +300,53 @@ private static DataType convertToDataType(Schema schema) {
 * @return Avro's {@link Schema} matching this logical type.
 */
public static Schema convertToSchema(LogicalType logicalType) {
+   // If it is parsing the root row type, switches from nullable 
true to false
+   // because a nullable row type is meaningless and would 
generate wrong schema.
+   if (logicalType.getTypeRoot() == LogicalTypeRoot.ROW

Review comment:
   Although the `AvroSchemaConverter` is a tool class, it is still used as 
a public API, so i'm inclined to keep the signature unchanged. Another reason 
is that only logical type is enough for the conversion.
   
   Have added more documents to the methods.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-27 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r513135789



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##
@@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) {
 * @return Avro's {@link Schema} matching this logical type.
 */
public static Schema convertToSchema(LogicalType logicalType) {
-   return convertToSchema(logicalType, "record");
+   return convertToSchema(logicalType, "record", true);
}
 
/**
 * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
 *
 * @param logicalType logical type
 * @param rowName the record name
+* @param top whether it is parsing the root record,
+*if it is, the logical type nullability would be 
ignored
 * @return Avro's {@link Schema} matching this logical type.
 */
-   public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
+   public static Schema convertToSchema(
+   LogicalType logicalType,
+   String rowName,
+   boolean top) {

Review comment:
   > `In SQL the outer row is always NOT NULL`
   
   It is not true, a row (null, null) is nullable true. And i don't think it 
makes sense to change the planner behavior in general in order to fix a 
specific use case. 
   
   >// you can achieve the same with
   // Schema schema = convertToSchema(type.notNull(), "record") 
   
   I don't think we should let each invoker to decide whether to make the data 
type not null, because in current codebase, we should always do that, make the 
decision everyone is error-prone and hard to maintain.
   
   I have merge the `top` into a row type nullability switch, see 
`AvroSchemaConverter.convertToSchema(LogicalType logicalType)`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-27 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r513135789



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##
@@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) {
 * @return Avro's {@link Schema} matching this logical type.
 */
public static Schema convertToSchema(LogicalType logicalType) {
-   return convertToSchema(logicalType, "record");
+   return convertToSchema(logicalType, "record", true);
}
 
/**
 * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
 *
 * @param logicalType logical type
 * @param rowName the record name
+* @param top whether it is parsing the root record,
+*if it is, the logical type nullability would be 
ignored
 * @return Avro's {@link Schema} matching this logical type.
 */
-   public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
+   public static Schema convertToSchema(
+   LogicalType logicalType,
+   String rowName,
+   boolean top) {

Review comment:
   > `In SQL the outer row is always NOT NULL`
   It is not true, a row (null, null) is nullable true. And i don't think it 
makes sense to change the planner behavior in general in order to fix a 
specific use case. 
   
   >// you can achieve the same with
   >// Schema schema = convertToSchema(type.notNull(), "record") 
   
   I don't think we should let each invoker to decide whether to make the data 
type not null, because in current codebase, we should always do that, make the 
decision everyone is error-prone and hard to maintain.
   
   I have merge the `top` into a row type nullability switch, see 
`AvroSchemaConverter.convertToSchema(LogicalType logicalType)`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-27 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r512640835



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##
@@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) {
 * @return Avro's {@link Schema} matching this logical type.
 */
public static Schema convertToSchema(LogicalType logicalType) {
-   return convertToSchema(logicalType, "record");
+   return convertToSchema(logicalType, "record", true);
}
 
/**
 * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
 *
 * @param logicalType logical type
 * @param rowName the record name
+* @param top whether it is parsing the root record,
+*if it is, the logical type nullability would be 
ignored
 * @return Avro's {@link Schema} matching this logical type.
 */
-   public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
+   public static Schema convertToSchema(
+   LogicalType logicalType,
+   String rowName,
+   boolean top) {

Review comment:
   Currently, only avro format cares about the outer row nullability, we 
can switch to change the planner if we found more user cases.
   
   Technically to say, it is impossible to infer the outer row nullability only 
from the DDL.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-27 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r512618789



##
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
##
@@ -104,48 +104,115 @@ public void testRowTypeAvroSchemaConversion() {
DataTypes.FIELD("row3", 
DataTypes.ROW(DataTypes.FIELD("c", DataTypes.STRING())
.build().toRowDataType().getLogicalType();
Schema schema = AvroSchemaConverter.convertToSchema(rowType);
-   assertEquals("{\n" +
+   assertEquals("[ {\n" +

Review comment:
   Thanks, i would add `schema` -> `DataType` -> `schema` test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-27 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r512618142



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##
@@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) {
 * @return Avro's {@link Schema} matching this logical type.
 */
public static Schema convertToSchema(LogicalType logicalType) {
-   return convertToSchema(logicalType, "record");
+   return convertToSchema(logicalType, "record", true);
}
 
/**
 * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
 *
 * @param logicalType logical type
 * @param rowName the record name
+* @param top whether it is parsing the root record,
+*if it is, the logical type nullability would be 
ignored
 * @return Avro's {@link Schema} matching this logical type.
 */
-   public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
+   public static Schema convertToSchema(
+   LogicalType logicalType,
+   String rowName,
+   boolean top) {

Review comment:
   It is meaningless to have a nullable top row type, and in the `CREATE 
TABLE` DDL there is no way to specify that the table row is not nullable, 
actually it should always be not null(even if all the fields are null).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-27 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r512616616



##
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
##
@@ -181,10 +181,11 @@ public void testSpecificType() throws Exception {
encoder.flush();
byte[] input = byteArrayOutputStream.toByteArray();
 
+   // SE/DE SpecificRecord using the GenericRecord way only 
supports non-nullable data type.

Review comment:
   Thanks, would remove it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-26 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r512374210



##
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
##
@@ -104,48 +104,115 @@ public void testRowTypeAvroSchemaConversion() {
DataTypes.FIELD("row3", 
DataTypes.ROW(DataTypes.FIELD("c", DataTypes.STRING())
.build().toRowDataType().getLogicalType();
Schema schema = AvroSchemaConverter.convertToSchema(rowType);
-   assertEquals("{\n" +
+   assertEquals("[ {\n" +

Review comment:
   Already added, see 
`AvroSchemaConverterTest.testConversionIntegralityNullable`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-26 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r512367627



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##
@@ -417,4 +431,11 @@ public static LogicalType 
extractValueTypeToAvroMap(LogicalType type) {
}
return builder;
}
+
+   /** Returns schema with nullable true. */
+   private static Schema nullableSchema(Schema schema) {

Review comment:
   They are for different purpose but i think we can use `nullableSchema` 
altogether.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…

2020-10-26 Thread GitBox


danny0405 commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r511757745



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##
@@ -362,7 +369,11 @@ public static Schema convertToSchema(LogicalType 
logicalType, String rowName) {
.record(rowName)
.fields();
for (int i = 0; i < rowType.getFieldCount(); 
i++) {
-   String fieldName = rowName + "_" + 
fieldNames.get(i);
+   String fieldName = fieldNames.get(i);
+   if (rowName.equals(fieldName)) {
+   // Can not build schema when 
the record and field have the same name
+   fieldName = rowName + "_" + 
fieldName;
+   }

Review comment:
   Yes, the avro schema builder does not allow same name field names, even 
if they are in different scope (different layer).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org