[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-16 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358625975
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Utilities to {@link TableSchema}.
+ */
+@Internal
+public class TableSchemaUtils {
+
+   /**
+* Return {@link TableSchema} which consists of all physical columns. 
That means, the computed
+* columns are filtered out.
+*
+* Readers(or writers) such as {@link TableSource} and {@link 
TableSink} should use this physical
+* schema to generate {@link TableSource#getProducedDataType()} and 
{@link TableSource#getTableSchema()}
+* rather than using the raw TableSchema which may lead contains 
computed columns.
+*/
+   public static TableSchema getPhysicalSchema(TableSchema tableSchema) {
+   Preconditions.checkNotNull(tableSchema);
+   TableSchema.Builder builder = new TableSchema.Builder();
+   tableSchema.getTableColumns().forEach(
+   tableColumn -> {
+   if (!tableColumn.isGenerated()) {
+   builder.field(tableColumn.getName(), 
tableColumn.getType());
+   }
+   });
+   return builder.build();
+   }
+
+   /**
+* Returns whether there contains the generated {@link TableColumn} 
such as computed column and watermark.
+*/
+   public static boolean containsGeneratedColumn(TableSchema tableSchema) {
 
 Review comment:
   Remove unused method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-16 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358625904
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
 ##
 @@ -163,4 +164,59 @@ class SchemaValidatorTest {
 assertTrue(extractor.equals(new CustomExtractor("f3")))
 
assertTrue(rowtime.getWatermarkStrategy.isInstanceOf[BoundedOutOfOrderTimestamps])
   }
+
+  @Test
+  def testSchemaWithGeneratedColumnAndWatermark(): Unit = {
+val descriptor = new Schema()
+  .field("f1", DataTypes.STRING)
+  .field("f2", DataTypes.INT)
+  .field("f3", DataTypes.TIMESTAMP(3))
+
+val properties = new DescriptorProperties()
+properties.putProperties(descriptor.toProperties)
+properties.putString("schema.3.name", "generated-column")
+properties.putString("schema.3.data-type", DataTypes.INT.toString)
+properties.putString("schema.3.expr", "f2 + 1")
+properties.putString("schema.watermark.0.rowtime", "f3")
+properties.putString("schema.watermark.0.strategy.expr", "f3 - INTERVAL 
'5' SECOND")
+properties.putString("schema.watermark.0.strategy.data-type", 
DataTypes.TIMESTAMP(3).toString)
+
+new SchemaValidator(true, true, false).validate(properties)
+val expectd = TableSchema.builder()
+  .field("f1", DataTypes.STRING)
+  .field("f2", DataTypes.INT)
+  .field("f3", DataTypes.TIMESTAMP(3))
+  .build()
+val schema = SchemaValidator.deriveTableSinkSchema(properties)
+assertEquals(expectd, schema)
+   }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testSchemaWithMultiWatermark(): Unit = {
 
 Review comment:
   I think we can remove this test, because multiple watermark validatation has 
been covered by the framework (i.e. TableSchema). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358041440
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Utilities to deal physical {@link TableColumn} from {@link TableSchema}.
+ */
+@Internal
+public class TableSchemaUtils {
+
+   /**
+* Return {@link TableSchema} which consist of all physical columns.
+* Constructor of {@link TableSource} and {@link TableSink} should call 
this method.
+*/
+   public static TableSchema getPhysicalTableSchema(TableSchema 
tableSchema) {
 
 Review comment:
   `getPhysicalSchema`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358042919
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
 ##
 @@ -216,6 +218,13 @@ private static void validateTimestampExtractorArguments(
}
}
 
+   public static void validateContainsOnlyPhysicalColumn(TableSchema 
tableSchema) {
+   if (!TableSchemaUtils.containsOnlyPhysicalColumn(tableSchema)) {
+   throw new ValidationException(
+   "TableSchema of TableSource contains 
non-physical column.");
 
 Review comment:
   "TableSource#getTableSchema shouldn't contain generated columns, schema: \n" 
+ tableSchema


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358042556
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Utilities to deal physical {@link TableColumn} from {@link TableSchema}.
+ */
+@Internal
+public class TableSchemaUtils {
+
+   /**
+* Return {@link TableSchema} which consist of all physical columns.
+* Constructor of {@link TableSource} and {@link TableSink} should call 
this method.
+*/
+   public static TableSchema getPhysicalTableSchema(TableSchema 
tableSchema) {
+   Preconditions.checkNotNull(tableSchema);
+   TableSchema.Builder builder = new TableSchema.Builder();
+   tableSchema.getTableColumns().stream().forEach(
 
 Review comment:
   ```suggestion
tableSchema.getTableColumns().forEach(
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358046608
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java
 ##
 @@ -100,7 +101,7 @@
 * for external output.
 */
private static TableSchema buildNewTableSchema(Table table) {
-   TableSchema oldSchema = table.getSchema();
+   TableSchema oldSchema = 
TableSchemaUtils.getPhysicalTableSchema(table.getSchema());
 
 Review comment:
   I think we shouldn't change this line. The schema of `Table` may contains 
generated columns, but the generated column should also be outputed. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358044466
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
 ##
 @@ -118,6 +128,33 @@ else if (proctimeFound) {
properties.validateExclusion(proctime);
}
}
+
+   validateWatermark(properties);
+   }
+
+   /**
+* Validate watermarks if exists.
+*/
+   private void validateWatermark(DescriptorProperties properties) {
+   final String schemaWatermarkKey = SCHEMA + "." + WATERMARK;
+   int watermarkRowtimeKeys = 
properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_ROWTIME).size();
+   int watermarkStrategyKeys = 
properties.getIndexedProperty(schemaWatermarkKey, 
WATERMARK_STRATEGY_EXPR).size();
+   int watermarkStrategyDataTypeKeys = 
properties.getIndexedProperty(schemaWatermarkKey, 
WATERMARK_STRATEGY_DATA_TYPE).size();
+
+   if (watermarkRowtimeKeys == 1 && watermarkStrategyKeys == 1 && 
watermarkStrategyDataTypeKeys == 1) {
+   String watermarkRowtime = schemaWatermarkKey + "." + 0 
+ "." + WATERMARK_ROWTIME;
+   String watermarkStrategy = schemaWatermarkKey + "." + 0 
+ "." + WATERMARK_STRATEGY_EXPR;
+   String watermarkStrategyDataType = schemaWatermarkKey + 
"." + 0 + "." + WATERMARK_STRATEGY_DATA_TYPE;
+   properties.validateString(watermarkRowtime, false, 1);
+   properties.validateString(watermarkStrategy, false, 1);
+   properties.validateString(watermarkStrategyDataType, 
false, 1);
+   } else if (watermarkRowtimeKeys == 0 && watermarkStrategyKeys 
== 0 && watermarkStrategyDataTypeKeys == 0) {
+
 
 Review comment:
   Add a comment on the empty code block, e.g. 
   
   ```
   // do nothing if watermark is not defined
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358045766
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
 ##
 @@ -118,6 +128,33 @@ else if (proctimeFound) {
properties.validateExclusion(proctime);
}
}
+
+   validateWatermark(properties);
+   }
+
+   /**
+* Validate watermarks if exists.
+*/
+   private void validateWatermark(DescriptorProperties properties) {
 
 Review comment:
   I just remembered that  we don't need to validate watermark in connector 
side. The watermark properteis and computed column properties will be validated 
and parsed by planner. So we can just remove them from the validation. The only 
thing that connectors need to do is adding them to the supported list. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358043739
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -108,6 +113,14 @@
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
 
+   // watermark
+   properties.add(SCHEMA + "." + WATERMARK + ".#."  + 
WATERMARK_ROWTIME);
+   properties.add(SCHEMA + "." + WATERMARK + ".#."  + 
WATERMARK_STRATEGY_EXPR);
+   properties.add(SCHEMA + "." + WATERMARK + ".#."  + 
WATERMARK_STRATEGY_DATA_TYPE);
+
+   // computed column
+   properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
 
 Review comment:
   Move this to schema part, i.e. after `SCHEMA_NAME`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358042195
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Utilities to deal physical {@link TableColumn} from {@link TableSchema}.
+ */
+@Internal
+public class TableSchemaUtils {
+
+   /**
+* Return {@link TableSchema} which consist of all physical columns.
+* Constructor of {@link TableSource} and {@link TableSink} should call 
this method.
+*/
 
 Review comment:
   Readers(or writers) such as {@link TableSource} and {@link TableSink} should 
use this physical schema to generate {@link TableSource#getProducedDataType()} 
and {@link TableSource#getTableSchema()} rather than using the raw 
`TableSchema` which may contains computed columns.   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358041305
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Utilities to deal physical {@link TableColumn} from {@link TableSchema}.
+ */
+@Internal
+public class TableSchemaUtils {
+
+   /**
+* Return {@link TableSchema} which consist of all physical columns.
+* Constructor of {@link TableSource} and {@link TableSink} should call 
this method.
+*/
+   public static TableSchema getPhysicalTableSchema(TableSchema 
tableSchema) {
+   Preconditions.checkNotNull(tableSchema);
+   TableSchema.Builder builder = new TableSchema.Builder();
+   tableSchema.getTableColumns().stream().forEach(
+   tableColumn -> {
+   if (!tableColumn.isGenerated()) {
+   builder.field(tableColumn.getName(), 
tableColumn.getType());
+   }
+   });
+   return builder.build();
+   }
+
+   /**
+* Returns whether there only contains the physical {@link TableColumn}.
+*/
+   public static boolean containsOnlyPhysicalColumn(TableSchema 
tableSchema) {
 
 Review comment:
   Call it `containsGeneratedColumn`?  
   From my understanding, if it only contains physical columns, then it 
shouldn't contain primary key and watermarks too. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358043811
 
 

 ##
 File path: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java
 ##
 @@ -54,7 +55,7 @@ public HBaseUpsertTableSink(
HBaseWriteOptions writeOptions) {
checkArgument(hbaseTableSchema.getRowKeyName().isPresent(), 
"HBaseUpsertTableSink requires rowkey is set.");
this.hbaseTableSchema = hbaseTableSchema;
-   this.tableSchema = hbaseTableSchema.convertsToTableSchema();
+   this.tableSchema = 
TableSchemaUtils.getPhysicalTableSchema(hbaseTableSchema.convertsToTableSchema());
 
 Review comment:
   Actually, this is not needed, because 
`hbaseTableSchema.convertsToTableSchema()` returns only physical schema. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358042505
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Utilities to deal physical {@link TableColumn} from {@link TableSchema}.
+ */
+@Internal
+public class TableSchemaUtils {
+
+   /**
+* Return {@link TableSchema} which consist of all physical columns.
+* Constructor of {@link TableSource} and {@link TableSink} should call 
this method.
+*/
+   public static TableSchema getPhysicalTableSchema(TableSchema 
tableSchema) {
+   Preconditions.checkNotNull(tableSchema);
+   TableSchema.Builder builder = new TableSchema.Builder();
+   tableSchema.getTableColumns().stream().forEach(
+   tableColumn -> {
+   if (!tableColumn.isGenerated()) {
+   builder.field(tableColumn.getName(), 
tableColumn.getType());
+   }
+   });
+   return builder.build();
+   }
+
+   /**
+* Returns whether there only contains the physical {@link TableColumn}.
+*/
+   public static boolean containsOnlyPhysicalColumn(TableSchema 
tableSchema) {
+   Preconditions.checkNotNull(tableSchema);
+   return 
tableSchema.getTableColumns().stream().noneMatch(tableColumn -> 
tableColumn.isGenerated());
 
 Review comment:
   `tableColumn -> tableColumn.isGenerated()` -> `TableColumn::isGenerated`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358043306
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
 ##
 @@ -67,7 +68,7 @@ protected KafkaTableSinkBase(
Properties properties,
Optional> partitioner,
SerializationSchema serializationSchema) {
-   this.schema = Preconditions.checkNotNull(schema, "Schema must 
not be null.");
+   this.schema = 
Preconditions.checkNotNull(TableSchemaUtils.getPhysicalTableSchema(schema), 
"Schema must not be null.");
 
 Review comment:
   check not null before call `getPhysicalTableSchema`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-15 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r358043379
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
 ##
 @@ -134,6 +139,13 @@
properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
 
+   // watermark
+   properties.add(SCHEMA + "." + WATERMARK + ".#."  + 
WATERMARK_ROWTIME);
+   properties.add(SCHEMA + "." + WATERMARK + ".#."  + 
WATERMARK_STRATEGY_EXPR);
+   properties.add(SCHEMA + "." + WATERMARK + ".#."  + 
WATERMARK_STRATEGY_DATA_TYPE);
+   // computed column
+   properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
 
 Review comment:
   Move this to schema part, i.e. after `SCHEMA_FROM`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-12 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r357501589
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
 ##
 @@ -118,6 +131,36 @@ else if (proctimeFound) {
properties.validateExclusion(proctime);
}
}
+
+   validateWatermark(properties);
+   }
+
+   /**
+* Validate watermarks if exists.
+*/
+   private void validateWatermark(DescriptorProperties properties) {
+   final String schemaWatermarkKey = SCHEMA + "." + WATERMARK;
+   int watermarkRowtimeKeys = 
properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_ROWTIME).size();
+   int watermarkStrategyKeys = 
properties.getIndexedProperty(schemaWatermarkKey, 
WATERMARK_STRATEGY_EXPR).size();
+   int watermarkStrategyDataTypeKeys = 
properties.getIndexedProperty(schemaWatermarkKey, 
WATERMARK_STRATEGY_DATA_TYPE).size();
+
+   if (watermarkRowtimeKeys > 0 || watermarkStrategyKeys > 0 || 
watermarkStrategyDataTypeKeys > 0) {
+   if (!isStreamEnvironment) {
+   throw new ValidationException(
+   format("Property '%s' is not allowed in 
a batch environment.", schemaWatermarkKey));
 
 Review comment:
   Please remove this limitation. DDL (or descriptor) should be unified, which 
means the same DDL can both work in batch and streaming mode. The differences 
is that watermark information will be ignored in batch mode. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-12 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r357499765
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
 ##
 @@ -165,7 +166,7 @@ public void setIsAppendOnly(Boolean isAppendOnly) {
 
@Override
public TypeInformation getRecordType() {
-   return schema.toRowType();
+   return 
TableSchemaUtils.filterGeneratedColumn(schema).toRowType();
 
 Review comment:
   This is still problematic. The table sink should carry the true **physical** 
schema. Otherwise, for example, if `getTableSchema` returns generated columns, 
the planner will fail in some cases. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-12 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r357500858
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
 ##
 @@ -89,6 +95,13 @@ public void validate(DescriptorProperties properties) {
properties.validateString(SCHEMA + "." + i + "." + 
SCHEMA_NAME, false, 1);
properties.validateDataType(SCHEMA + "." + i + "." + 
SCHEMA_DATA_TYPE, SCHEMA + "." + i + "." + SCHEMA_TYPE, false);
properties.validateString(SCHEMA + "." + i + "." + 
SCHEMA_FROM, true, 1);
+
+   // validate generated column expression
+   final String generatedColumnExpressionKey = SCHEMA + 
"." + i + "." + TABLE_SCHEMA_EXPR;
+   if 
(properties.containsKey(generatedColumnExpressionKey)) {
+   
properties.validateString(generatedColumnExpressionKey, false, 1);
+   }
 
 Review comment:
   simplify to one line:
   
   ```java
   properties.validateString(SCHEMA + "." + i + "." + TABLE_SCHEMA_EXPR, true, 
1);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-12 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r357507281
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 ##
 @@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Utilities to filter generated column from {@link TableSchema}.
+ */
+@Internal
+public class TableSchemaUtils {
+
+   public static TableSchema filterGeneratedColumn(TableSchema 
tableSchema) {
+   Preconditions.checkNotNull(tableSchema);
 
 Review comment:
   What about `TableSchema getPhysicalSchema(TableSchema)`? The returned schema 
only returns physcial fields, and doesn't contain keys, watemark information, 
because connectors don't care about that. If a watermark is based on a computed 
column, it is weird to carry the watermark but without the referenced column. 
   
   Btw, `filterGeneratedColumn` is not correct, it describe the reverse 
behavior. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-12 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r357504240
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
 ##
 @@ -118,6 +131,36 @@ else if (proctimeFound) {
properties.validateExclusion(proctime);
}
}
+
+   validateWatermark(properties);
+   }
+
+   /**
+* Validate watermarks if exists.
+*/
+   private void validateWatermark(DescriptorProperties properties) {
+   final String schemaWatermarkKey = SCHEMA + "." + WATERMARK;
+   int watermarkRowtimeKeys = 
properties.getIndexedProperty(schemaWatermarkKey, WATERMARK_ROWTIME).size();
+   int watermarkStrategyKeys = 
properties.getIndexedProperty(schemaWatermarkKey, 
WATERMARK_STRATEGY_EXPR).size();
+   int watermarkStrategyDataTypeKeys = 
properties.getIndexedProperty(schemaWatermarkKey, 
WATERMARK_STRATEGY_DATA_TYPE).size();
+
+   if (watermarkRowtimeKeys > 0 || watermarkStrategyKeys > 0 || 
watermarkStrategyDataTypeKeys > 0) {
 
 Review comment:
   We only support at most one watermark definition, and the sizes of them 
should be the same. 
   Please update the validation. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-12 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r357505503
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
 ##
 @@ -151,7 +163,9 @@ public static TableSchema deriveSchema(Map 
properties) {
.orElse(false);
final String timestampKey = SCHEMA + '.' + i + '.' + 
ROWTIME_TIMESTAMPS_TYPE;
final boolean isRowtime = 
descriptorProperties.containsKey(timestampKey);
-   if (!isProctime && !isRowtime) {
+   if (!isProctime
+   && !isRowtime
+   && baseSchema.getTableColumn(i).map(col -> 
!col.isGenerated()).orElse(false)) {
 
 Review comment:
   Extract the isGenerated flag before if condition? 
   
   ```java
   boolean isGeneratedColumn = properties
.containsKey(SCHEMA + "." + i + "." + 
TABLE_SCHEMA_EXPR);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10536: [FLINK-15191][connectors / kafka]Fix can't create table source for Kafka if watermark or computed column is defined.

2019-12-12 Thread GitBox
wuchong commented on a change in pull request #10536: [FLINK-15191][connectors 
/ kafka]Fix can't create table source for Kafka if watermark or computed column 
is defined.
URL: https://github.com/apache/flink/pull/10536#discussion_r357505704
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
 ##
 @@ -59,7 +60,7 @@
 * @param tableSource The {@link TableSource} for which the time 
attributes are checked.
 */
public static void validateTableSource(TableSource tableSource){
-   TableSchema schema = tableSource.getTableSchema();
+   TableSchema schema = 
TableSchemaUtils.filterGeneratedColumn(tableSource.getTableSchema());
 
 Review comment:
   Should trust schema from users. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services