[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358625975 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.util.Preconditions; + +/** + * Utilities to {@link TableSchema}. + */ +@Internal +public class TableSchemaUtils { + + /** +* Return {@link TableSchema} which consists of all physical columns. That means, the computed +* columns are filtered out. +* +* Readers(or writers) such as {@link TableSource} and {@link TableSink} should use this physical +* schema to generate {@link TableSource#getProducedDataType()} and {@link TableSource#getTableSchema()} +* rather than using the raw TableSchema which may lead contains computed columns. +*/ + public static TableSchema getPhysicalSchema(TableSchema tableSchema) { + Preconditions.checkNotNull(tableSchema); + TableSchema.Builder builder = new TableSchema.Builder(); + tableSchema.getTableColumns().forEach( + tableColumn -> { + if (!tableColumn.isGenerated()) { + builder.field(tableColumn.getName(), tableColumn.getType()); + } + }); + return builder.build(); + } + + /** +* Returns whether there contains the generated {@link TableColumn} such as computed column and watermark. +*/ + public static boolean containsGeneratedColumn(TableSchema tableSchema) { Review comment: Remove unused method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358625904 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala ## @@ -163,4 +164,59 @@ class SchemaValidatorTest { assertTrue(extractor.equals(new CustomExtractor("f3"))) assertTrue(rowtime.getWatermarkStrategy.isInstanceOf[BoundedOutOfOrderTimestamps]) } + + @Test + def testSchemaWithGeneratedColumnAndWatermark(): Unit = { +val descriptor = new Schema() + .field("f1", DataTypes.STRING) + .field("f2", DataTypes.INT) + .field("f3", DataTypes.TIMESTAMP(3)) + +val properties = new DescriptorProperties() +properties.putProperties(descriptor.toProperties) +properties.putString("schema.3.name", "generated-column") +properties.putString("schema.3.data-type", DataTypes.INT.toString) +properties.putString("schema.3.expr", "f2 + 1") +properties.putString("schema.watermark.0.rowtime", "f3") +properties.putString("schema.watermark.0.strategy.expr", "f3 - INTERVAL '5' SECOND") +properties.putString("schema.watermark.0.strategy.data-type", DataTypes.TIMESTAMP(3).toString) + +new SchemaValidator(true, true, false).validate(properties) +val expectd = TableSchema.builder() + .field("f1", DataTypes.STRING) + .field("f2", DataTypes.INT) + .field("f3", DataTypes.TIMESTAMP(3)) + .build() +val schema = SchemaValidator.deriveTableSinkSchema(properties) +assertEquals(expectd, schema) + } + + @Test(expected = classOf[IllegalStateException]) + def testSchemaWithMultiWatermark(): Unit = { Review comment: I think we can remove this test, because multiple watermark validatation has been covered by the framework (i.e. TableSchema). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358041440 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.util.Preconditions; + +/** + * Utilities to deal physical {@link TableColumn} from {@link TableSchema}. + */ +@Internal +public class TableSchemaUtils { + + /** +* Return {@link TableSchema} which consist of all physical columns. +* Constructor of {@link TableSource} and {@link TableSink} should call this method. +*/ + public static TableSchema getPhysicalTableSchema(TableSchema tableSchema) { Review comment: `getPhysicalSchema`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358042919 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java ## @@ -216,6 +218,13 @@ private static void validateTimestampExtractorArguments( } } + public static void validateContainsOnlyPhysicalColumn(TableSchema tableSchema) { + if (!TableSchemaUtils.containsOnlyPhysicalColumn(tableSchema)) { + throw new ValidationException( + "TableSchema of TableSource contains non-physical column."); Review comment: "TableSource#getTableSchema shouldn't contain generated columns, schema: \n" + tableSchema This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358042556 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.util.Preconditions; + +/** + * Utilities to deal physical {@link TableColumn} from {@link TableSchema}. + */ +@Internal +public class TableSchemaUtils { + + /** +* Return {@link TableSchema} which consist of all physical columns. +* Constructor of {@link TableSource} and {@link TableSink} should call this method. +*/ + public static TableSchema getPhysicalTableSchema(TableSchema tableSchema) { + Preconditions.checkNotNull(tableSchema); + TableSchema.Builder builder = new TableSchema.Builder(); + tableSchema.getTableColumns().stream().forEach( Review comment: ```suggestion tableSchema.getTableColumns().forEach( ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358046608 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java ## @@ -100,7 +101,7 @@ * for external output. */ private static TableSchema buildNewTableSchema(Table table) { - TableSchema oldSchema = table.getSchema(); + TableSchema oldSchema = TableSchemaUtils.getPhysicalTableSchema(table.getSchema()); Review comment: I think we shouldn't change this line. The schema of `Table` may contains generated columns, but the generated column should also be outputed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358044466 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java ## @@ -118,6 +128,33 @@ else if (proctimeFound) { properties.validateExclusion(proctime); } } + + validateWatermark(properties); + } + + /** +* Validate watermarks if exists. +*/ + private void validateWatermark(DescriptorProperties properties) { + final String schemaWatermarkKey = SCHEMA + "." + WATERMARK; + int watermarkRowtimeKeys = properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_ROWTIME).size(); + int watermarkStrategyKeys = properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_STRATEGY_EXPR).size(); + int watermarkStrategyDataTypeKeys = properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_STRATEGY_DATA_TYPE).size(); + + if (watermarkRowtimeKeys == 1 && watermarkStrategyKeys == 1 && watermarkStrategyDataTypeKeys == 1) { + String watermarkRowtime = schemaWatermarkKey + "." + 0 + "." + WATERMARK_ROWTIME; + String watermarkStrategy = schemaWatermarkKey + "." + 0 + "." + WATERMARK_STRATEGY_EXPR; + String watermarkStrategyDataType = schemaWatermarkKey + "." + 0 + "." + WATERMARK_STRATEGY_DATA_TYPE; + properties.validateString(watermarkRowtime, false, 1); + properties.validateString(watermarkStrategy, false, 1); + properties.validateString(watermarkStrategyDataType, false, 1); + } else if (watermarkRowtimeKeys == 0 && watermarkStrategyKeys == 0 && watermarkStrategyDataTypeKeys == 0) { + Review comment: Add a comment on the empty code block, e.g. ``` // do nothing if watermark is not defined ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358045766 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java ## @@ -118,6 +128,33 @@ else if (proctimeFound) { properties.validateExclusion(proctime); } } + + validateWatermark(properties); + } + + /** +* Validate watermarks if exists. +*/ + private void validateWatermark(DescriptorProperties properties) { Review comment: I just remembered that we don't need to validate watermark in connector side. The watermark properteis and computed column properties will be validated and parsed by planner. So we can just remove them from the validation. The only thing that connectors need to do is adding them to the supported list. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358043739 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -108,6 +113,14 @@ properties.add(SCHEMA + ".#." + SCHEMA_TYPE); properties.add(SCHEMA + ".#." + SCHEMA_NAME); + // watermark + properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME); + properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR); + properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE); + + // computed column + properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR); Review comment: Move this to schema part, i.e. after `SCHEMA_NAME`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358042195 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.util.Preconditions; + +/** + * Utilities to deal physical {@link TableColumn} from {@link TableSchema}. + */ +@Internal +public class TableSchemaUtils { + + /** +* Return {@link TableSchema} which consist of all physical columns. +* Constructor of {@link TableSource} and {@link TableSink} should call this method. +*/ Review comment: Readers(or writers) such as {@link TableSource} and {@link TableSink} should use this physical schema to generate {@link TableSource#getProducedDataType()} and {@link TableSource#getTableSchema()} rather than using the raw `TableSchema` which may contains computed columns. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358041305 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.util.Preconditions; + +/** + * Utilities to deal physical {@link TableColumn} from {@link TableSchema}. + */ +@Internal +public class TableSchemaUtils { + + /** +* Return {@link TableSchema} which consist of all physical columns. +* Constructor of {@link TableSource} and {@link TableSink} should call this method. +*/ + public static TableSchema getPhysicalTableSchema(TableSchema tableSchema) { + Preconditions.checkNotNull(tableSchema); + TableSchema.Builder builder = new TableSchema.Builder(); + tableSchema.getTableColumns().stream().forEach( + tableColumn -> { + if (!tableColumn.isGenerated()) { + builder.field(tableColumn.getName(), tableColumn.getType()); + } + }); + return builder.build(); + } + + /** +* Returns whether there only contains the physical {@link TableColumn}. +*/ + public static boolean containsOnlyPhysicalColumn(TableSchema tableSchema) { Review comment: Call it `containsGeneratedColumn`? From my understanding, if it only contains physical columns, then it shouldn't contain primary key and watermarks too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358043811 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java ## @@ -54,7 +55,7 @@ public HBaseUpsertTableSink( HBaseWriteOptions writeOptions) { checkArgument(hbaseTableSchema.getRowKeyName().isPresent(), "HBaseUpsertTableSink requires rowkey is set."); this.hbaseTableSchema = hbaseTableSchema; - this.tableSchema = hbaseTableSchema.convertsToTableSchema(); + this.tableSchema = TableSchemaUtils.getPhysicalTableSchema(hbaseTableSchema.convertsToTableSchema()); Review comment: Actually, this is not needed, because `hbaseTableSchema.convertsToTableSchema()` returns only physical schema. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358042505 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.util.Preconditions; + +/** + * Utilities to deal physical {@link TableColumn} from {@link TableSchema}. + */ +@Internal +public class TableSchemaUtils { + + /** +* Return {@link TableSchema} which consist of all physical columns. +* Constructor of {@link TableSource} and {@link TableSink} should call this method. +*/ + public static TableSchema getPhysicalTableSchema(TableSchema tableSchema) { + Preconditions.checkNotNull(tableSchema); + TableSchema.Builder builder = new TableSchema.Builder(); + tableSchema.getTableColumns().stream().forEach( + tableColumn -> { + if (!tableColumn.isGenerated()) { + builder.field(tableColumn.getName(), tableColumn.getType()); + } + }); + return builder.build(); + } + + /** +* Returns whether there only contains the physical {@link TableColumn}. +*/ + public static boolean containsOnlyPhysicalColumn(TableSchema tableSchema) { + Preconditions.checkNotNull(tableSchema); + return tableSchema.getTableColumns().stream().noneMatch(tableColumn -> tableColumn.isGenerated()); Review comment: `tableColumn -> tableColumn.isGenerated()` -> `TableColumn::isGenerated`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358043306 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java ## @@ -67,7 +68,7 @@ protected KafkaTableSinkBase( Properties properties, Optional> partitioner, SerializationSchema serializationSchema) { - this.schema = Preconditions.checkNotNull(schema, "Schema must not be null."); + this.schema = Preconditions.checkNotNull(TableSchemaUtils.getPhysicalTableSchema(schema), "Schema must not be null."); Review comment: check not null before call `getPhysicalTableSchema`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r358043379 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -134,6 +139,13 @@ properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED); properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY); + // watermark + properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME); + properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR); + properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE); + // computed column + properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR); Review comment: Move this to schema part, i.e. after `SCHEMA_FROM`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r357501589 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java ## @@ -118,6 +131,36 @@ else if (proctimeFound) { properties.validateExclusion(proctime); } } + + validateWatermark(properties); + } + + /** +* Validate watermarks if exists. +*/ + private void validateWatermark(DescriptorProperties properties) { + final String schemaWatermarkKey = SCHEMA + "." + WATERMARK; + int watermarkRowtimeKeys = properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_ROWTIME).size(); + int watermarkStrategyKeys = properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_STRATEGY_EXPR).size(); + int watermarkStrategyDataTypeKeys = properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_STRATEGY_DATA_TYPE).size(); + + if (watermarkRowtimeKeys > 0 || watermarkStrategyKeys > 0 || watermarkStrategyDataTypeKeys > 0) { + if (!isStreamEnvironment) { + throw new ValidationException( + format("Property '%s' is not allowed in a batch environment.", schemaWatermarkKey)); Review comment: Please remove this limitation. DDL (or descriptor) should be unified, which means the same DDL can both work in batch and streaming mode. The differences is that watermark information will be ignored in batch mode. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r357499765 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java ## @@ -165,7 +166,7 @@ public void setIsAppendOnly(Boolean isAppendOnly) { @Override public TypeInformation getRecordType() { - return schema.toRowType(); + return TableSchemaUtils.filterGeneratedColumn(schema).toRowType(); Review comment: This is still problematic. The table sink should carry the true **physical** schema. Otherwise, for example, if `getTableSchema` returns generated columns, the planner will fail in some cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r357500858 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java ## @@ -89,6 +95,13 @@ public void validate(DescriptorProperties properties) { properties.validateString(SCHEMA + "." + i + "." + SCHEMA_NAME, false, 1); properties.validateDataType(SCHEMA + "." + i + "." + SCHEMA_DATA_TYPE, SCHEMA + "." + i + "." + SCHEMA_TYPE, false); properties.validateString(SCHEMA + "." + i + "." + SCHEMA_FROM, true, 1); + + // validate generated column expression + final String generatedColumnExpressionKey = SCHEMA + "." + i + "." + TABLE_SCHEMA_EXPR; + if (properties.containsKey(generatedColumnExpressionKey)) { + properties.validateString(generatedColumnExpressionKey, false, 1); + } Review comment: simplify to one line: ```java properties.validateString(SCHEMA + "." + i + "." + TABLE_SCHEMA_EXPR, true, 1); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r357507281 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.util.Preconditions; + +/** + * Utilities to filter generated column from {@link TableSchema}. + */ +@Internal +public class TableSchemaUtils { + + public static TableSchema filterGeneratedColumn(TableSchema tableSchema) { + Preconditions.checkNotNull(tableSchema); Review comment: What about `TableSchema getPhysicalSchema(TableSchema)`? The returned schema only returns physcial fields, and doesn't contain keys, watemark information, because connectors don't care about that. If a watermark is based on a computed column, it is weird to carry the watermark but without the referenced column. Btw, `filterGeneratedColumn` is not correct, it describe the reverse behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r357504240 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java ## @@ -118,6 +131,36 @@ else if (proctimeFound) { properties.validateExclusion(proctime); } } + + validateWatermark(properties); + } + + /** +* Validate watermarks if exists. +*/ + private void validateWatermark(DescriptorProperties properties) { + final String schemaWatermarkKey = SCHEMA + "." + WATERMARK; + int watermarkRowtimeKeys = properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_ROWTIME).size(); + int watermarkStrategyKeys = properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_STRATEGY_EXPR).size(); + int watermarkStrategyDataTypeKeys = properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_STRATEGY_DATA_TYPE).size(); + + if (watermarkRowtimeKeys > 0 || watermarkStrategyKeys > 0 || watermarkStrategyDataTypeKeys > 0) { Review comment: We only support at most one watermark definition, and the sizes of them should be the same. Please update the validation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r357505503 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java ## @@ -151,7 +163,9 @@ public static TableSchema deriveSchema(Map properties) { .orElse(false); final String timestampKey = SCHEMA + '.' + i + '.' + ROWTIME_TIMESTAMPS_TYPE; final boolean isRowtime = descriptorProperties.containsKey(timestampKey); - if (!isProctime && !isRowtime) { + if (!isProctime + && !isRowtime + && baseSchema.getTableColumn(i).map(col -> !col.isGenerated()).orElse(false)) { Review comment: Extract the isGenerated flag before if condition? ```java boolean isGeneratedColumn = properties .containsKey(SCHEMA + "." + i + "." + TABLE_SCHEMA_EXPR); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined. URL: https://github.com/apache/flink/pull/10536#discussion_r357505704 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java ## @@ -59,7 +60,7 @@ * @param tableSource The {@link TableSource} for which the time attributes are checked. */ public static void validateTableSource(TableSource tableSource){ - TableSchema schema = tableSource.getTableSchema(); + TableSchema schema = TableSchemaUtils.filterGeneratedColumn(tableSource.getTableSchema()); Review comment: Should trust schema from users. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services