[GitHub] [flink] twalthr commented on a change in pull request #17350: [FLINK-24359][table-runtime] Use ResolvedSchema in AbstractFileSystemTable

2021-10-07 Thread GitBox


twalthr commented on a change in pull request #17350:
URL: https://github.com/apache/flink/pull/17350#discussion_r724214646



##
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##
@@ -331,7 +338,7 @@ private RowDataPartitionComputer partitionComputer() {
 return new FileSystemFormatFactory.ReaderContext() {
 @Override
 public TableSchema getSchema() {

Review comment:
   good point, haven't seen that is deprecated. then we don't need to do 
the effort.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] twalthr commented on a change in pull request #17350: [FLINK-24359][table-runtime] Use ResolvedSchema in AbstractFileSystemTable

2021-10-07 Thread GitBox


twalthr commented on a change in pull request #17350:
URL: https://github.com/apache/flink/pull/17350#discussion_r724214646



##
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##
@@ -331,7 +338,7 @@ private RowDataPartitionComputer partitionComputer() {
 return new FileSystemFormatFactory.ReaderContext() {
 @Override
 public TableSchema getSchema() {

Review comment:
   good point, haven't seen that is deprecated. then we don't need to do 
the effort.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] twalthr commented on a change in pull request #17350: [FLINK-24359][table-runtime] Use ResolvedSchema in AbstractFileSystemTable

2021-10-06 Thread GitBox


twalthr commented on a change in pull request #17350:
URL: https://github.com/apache/flink/pull/17350#discussion_r723166638



##
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
##
@@ -39,7 +39,8 @@
 final DynamicTableFactory.Context context;
 final ObjectIdentifier tableIdentifier;
 final Configuration tableOptions;
-final TableSchema schema;
+final ResolvedSchema schema;

Review comment:
   both schema and dataType can now be easily obtained from the `context` 
member, we don't need additional members and keep the code short and simple

##
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/DeserializationSchemaAdapter.java
##
@@ -73,31 +72,32 @@
 
 public DeserializationSchemaAdapter(
 DeserializationSchema deserializationSchema,
-TableSchema schema,
+DataType physicalDataType,
 int[] projectFields,
 List partitionKeys,
 String defaultPartValue) {
 this.deserializationSchema = deserializationSchema;
-this.fieldNames = schema.getFieldNames();
-this.fieldTypes = schema.getFieldDataTypes();
+final RowType rowType = ((RowType) physicalDataType.getLogicalType());

Review comment:
   nit: use the `DataType.getFieldNames` method and the streams API? also 
for the types?

##
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##
@@ -310,20 +316,18 @@ public String asSummaryString() {
 
 private int[] readFields() {
 return projectedFields == null
-? IntStream.range(0, schema.getFieldCount()).toArray()
+? IntStream.range(0, 
DataType.getFieldCount(getPhysicalDataType())).toArray()
 : Arrays.stream(projectedFields).mapToInt(array -> 
array[0]).toArray();
 }
 
-private DataType getProducedDataType() {
-int[] fields = readFields();
-String[] schemaFieldNames = schema.getFieldNames();
-DataType[] schemaTypes = schema.getFieldDataTypes();
-
-return DataTypes.ROW(
-Arrays.stream(fields)
-.mapToObj(i -> 
DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i]))
-.toArray(DataTypes.Field[]::new))
-.bridgedTo(RowData.class)
-.notNull();
+private DataType getProjectedDataType() {
+DataType physicalDataType = super.getPhysicalDataType();

Review comment:
   nit: could be `final`

##
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##
@@ -158,8 +159,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
sinkContext) {
 private RowDataPartitionComputer partitionComputer() {
 return new RowDataPartitionComputer(
 defaultPartName,
-schema.getFieldNames(),
-schema.getFieldDataTypes(),
+DataType.getFieldNames(getPhysicalDataType()).toArray(new 
String[] {}),

Review comment:
   nit: in Flink we usually use `toArray(new String[0])` instead of 
`toArray(new String[] {})`
   see https://stackoverflow.com/questions/9572795/convert-list-to-array-in-java

##
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##
@@ -331,7 +338,7 @@ private RowDataPartitionComputer partitionComputer() {
 return new FileSystemFormatFactory.ReaderContext() {
 @Override
 public TableSchema getSchema() {

Review comment:
   can we update this to `DataType`? it would be good to get rid of 
`TableSchemaUtils`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org