[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224913#comment-16224913 ] ASF GitHub Bot commented on FLINK-7548: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4894 > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224884#comment-16224884 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4894 Merging > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224616#comment-16224616 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147661477 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala --- @@ -38,23 +38,26 @@ import scala.collection.mutable * @param path The path to the CSV file. * @param fieldNames The names of the table fields. * @param fieldTypes The types of the table fields. + * @param selectedFields The fields which will be read and returned by the table source. + * If None, all fields are returned. * @param fieldDelim The field delimiter, "," by default. * @param rowDelim The row delimiter, "\n" by default. * @param quoteCharacter An optional quote character for String values, null by default. * @param ignoreFirstLine Flag to ignore the first line, false by default. * @param ignoreComments An optional prefix to indicate comments, null by default. * @param lenient Flag to skip records with parse error instead to fail, false by default. */ -class CsvTableSource( +class CsvTableSource private ( private val path: String, private val fieldNames: Array[String], private val fieldTypes: Array[TypeInformation[_]], -private val fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, -private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, -private val quoteCharacter: Character = null, -private val ignoreFirstLine: Boolean = false, -private val ignoreComments: String = null, -private val lenient: Boolean = false) +private val selectedFields: Array[Int], +private val fieldDelim: String, +private val rowDelim: String, +private val quoteCharacter: Character, +private val ignoreFirstLine: Boolean, +private val ignoreComments: String, +private val lenient: Boolean) extends BatchTableSource[Row] --- End diff -- The definition of schema and return type depends on the TableSource and the encoding. Many checks are done in the constructors of classes like `TableSchema` and `RowTypeInfo`. I'm currently working on a consistent builder pattern for table sources that can take care of this issue as well. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224613#comment-16224613 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147661037 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -56,33 +63,123 @@ class StreamTableSourceScan( cluster, traitSet, getTable, - tableSource + tableSource, + selectedFields ) } override def copy( traitSet: RelTraitSet, - newTableSource: TableSource[_]) -: PhysicalTableSourceScan = { + newTableSource: TableSource[_]): PhysicalTableSourceScan = { new StreamTableSourceScan( cluster, traitSet, getTable, - newTableSource.asInstanceOf[StreamTableSource[_]] + newTableSource.asInstanceOf[StreamTableSource[_]], + selectedFields ) } override def translateToPlan( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { +val fieldIndexes = TableSourceUtil.computeIndexMapping( + tableSource, + isStreamTable = true, + selectedFields) + val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] -convertToInternalRow( - new RowSchema(getRowType), +val outputSchema = new RowSchema(this.getRowType) + +// check that declared and actual type of table source DataStream are identical +if (inputDataStream.getType != tableSource.getReturnType) { + throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + +s"returned a DataStream of type ${inputDataStream.getType} that does not match with the " + +s"type ${tableSource.getReturnType} declared by the TableSource.getReturnType() method. " + +s"Please validate the implementation of the TableSource.") +} + +// get expression to extract rowtime attribute +val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + tableSource, + selectedFields, + cluster, + tableEnv.getRelBuilder, + TimeIndicatorTypeInfo.ROWTIME_INDICATOR +) + +// ingest table and convert and extract time attributes if necessary +val ingestedTable = convertToInternalRow( + outputSchema, inputDataStream, - new StreamTableSourceTable(tableSource), - config) + fieldIndexes, + config, + rowtimeExpression) + +// generate watermarks for rowtime indicator +val rowtimeDesc: Option[RowtimeAttributeDescriptor] = + TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, selectedFields) + +val withWatermarks = if (rowtimeDesc.isDefined) { + val rowtimeFieldIdx = outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName) + val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy + watermarkStrategy match { +case p: PeriodicWatermarkAssigner => + val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) +case p: PunctuatedWatermarkAssigner => + val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + } +} else { + // No need to generate watermarks if no rowtime attribute is specified. + ingestedTable +} + +withWatermarks + } +} + +/** + * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]]. + * + * @param timeFieldIdx the index of the rowtime attribute. + * @param assigner the watermark assigner. + */ +private class PeriodicWatermarkAssignerWrapper( +timeFieldIdx: Int, +assigner: PeriodicWatermarkAssigner) + extends AssignerWithPeriodicWatermarks[CRow] { + + override def getCurrentWatermark: Watermark = assigner.getWatermark + + override def extractTimestamp(crow: CRow, previousElementTimestamp: Long): Long = { +val timestamp: Long = crow.row.getField(timeFieldIdx).asInstanceOf[Long] +assigner.nextTimestamp(timestamp) +0L --- End diff -- I think this does not really matter. We can always erase the timestamp
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224583#comment-16224583 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147658004 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala --- @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.sql.Timestamp + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.RelOptCluster +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalValues +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo + +import scala.collection.JavaConverters._ + +/** Util class for [[TableSource]]. */ +object TableSourceUtil { + + /** Returns true if the [[TableSource]] has a rowtime attribute. */ + def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean = +getRowtimeAttributes(tableSource).nonEmpty + + /** Returns true if the [[TableSource]] has a proctime attribute. */ + def hasProctimeAttribute(tableSource: TableSource[_]): Boolean = +getProctimeAttributes(tableSource).nonEmpty + + /** +* Validates a TableSource. +* +* - checks that all fields of the schema can be resolved +* - checks that resolved fields have the correct type +* - checks that the time attributes are correctly configured. +* +* @param tableSource The [[TableSource]] for which the time attributes are checked. +*/ + def validateTableSource(tableSource: TableSource[_]): Unit = { + +val schema = tableSource.getTableSchema +val tableFieldNames = schema.getColumnNames +val tableFieldTypes = schema.getTypes + +// get rowtime and proctime attributes +val rowtimeAttributes = getRowtimeAttributes(tableSource) +val proctimeAttributes = getProctimeAttributes(tableSource) + +// validate that schema fields can be resolved to a return type field of correct type +var mappedFieldCnt = 0 +tableFieldTypes.zip(tableFieldNames).foreach { + case (t: SqlTimeTypeInfo[_], name: String) +if t.getTypeClass == classOf[Timestamp] && proctimeAttributes.contains(name) => +// OK, field was mapped to proctime attribute + case (t: SqlTimeTypeInfo[_], name: String) +if t.getTypeClass == classOf[Timestamp] && rowtimeAttributes.contains(name) => +// OK, field was mapped to rowtime attribute + case (t: TypeInformation[_], name) => +// check if field is registered as time indicator +if (getProctimeAttributes(tableSource).contains(name)) { + throw new ValidationException(s"Processing time field '$name' has invalid type $t. " + +s"Processing time attributes must be of type ${Types.SQL_TIMESTAMP}.") +} +if (getRowtimeAttributes(tableSource).contains(name)) { + throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " + +s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.") +} +// check that field can be resolved in input type +val (physicalName, _, tpe)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224581#comment-16224581 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147657884 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.expressions.{Expression, ResolvedFieldReference} + +/** + * The [[FieldComputer]] interface returns an expression to compute the field of the table schema + * of a [[TableSource]] from one or more fields of the [[TableSource]]'s return type. + * + * @tparam T The result type of the provided expression. + */ +abstract class FieldComputer[T] { --- End diff -- constant attributes can be passed via the constructor (see `ExistingField`). For example you can define your example as ``` class ParseStringField(field: String, format: String) extends TimestampExtractor ``` The format string is directly embedded into the generated expression. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224027#comment-16224027 ] ASF GitHub Bot commented on FLINK-7548: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147574828 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala --- @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.sql.Timestamp + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.RelOptCluster +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalValues +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo + +import scala.collection.JavaConverters._ + +/** Util class for [[TableSource]]. */ +object TableSourceUtil { + + /** Returns true if the [[TableSource]] has a rowtime attribute. */ + def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean = +getRowtimeAttributes(tableSource).nonEmpty + + /** Returns true if the [[TableSource]] has a proctime attribute. */ + def hasProctimeAttribute(tableSource: TableSource[_]): Boolean = +getProctimeAttributes(tableSource).nonEmpty + + /** +* Validates a TableSource. +* +* - checks that all fields of the schema can be resolved +* - checks that resolved fields have the correct type +* - checks that the time attributes are correctly configured. +* +* @param tableSource The [[TableSource]] for which the time attributes are checked. +*/ + def validateTableSource(tableSource: TableSource[_]): Unit = { + +val schema = tableSource.getTableSchema +val tableFieldNames = schema.getColumnNames +val tableFieldTypes = schema.getTypes + +// get rowtime and proctime attributes +val rowtimeAttributes = getRowtimeAttributes(tableSource) +val proctimeAttributes = getProctimeAttributes(tableSource) + +// validate that schema fields can be resolved to a return type field of correct type +var mappedFieldCnt = 0 +tableFieldTypes.zip(tableFieldNames).foreach { + case (t: SqlTimeTypeInfo[_], name: String) +if t.getTypeClass == classOf[Timestamp] && proctimeAttributes.contains(name) => +// OK, field was mapped to proctime attribute + case (t: SqlTimeTypeInfo[_], name: String) +if t.getTypeClass == classOf[Timestamp] && rowtimeAttributes.contains(name) => +// OK, field was mapped to rowtime attribute + case (t: TypeInformation[_], name) => +// check if field is registered as time indicator +if (getProctimeAttributes(tableSource).contains(name)) { + throw new ValidationException(s"Processing time field '$name' has invalid type $t. " + +s"Processing time attributes must be of type ${Types.SQL_TIMESTAMP}.") +} +if (getRowtimeAttributes(tableSource).contains(name)) { + throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " + +s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.") +} +// check that field can be resolved in input type +val (physicalName, _, tpe) =
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224025#comment-16224025 ] ASF GitHub Bot commented on FLINK-7548: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147582469 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala --- @@ -38,23 +38,26 @@ import scala.collection.mutable * @param path The path to the CSV file. * @param fieldNames The names of the table fields. * @param fieldTypes The types of the table fields. + * @param selectedFields The fields which will be read and returned by the table source. + * If None, all fields are returned. * @param fieldDelim The field delimiter, "," by default. * @param rowDelim The row delimiter, "\n" by default. * @param quoteCharacter An optional quote character for String values, null by default. * @param ignoreFirstLine Flag to ignore the first line, false by default. * @param ignoreComments An optional prefix to indicate comments, null by default. * @param lenient Flag to skip records with parse error instead to fail, false by default. */ -class CsvTableSource( +class CsvTableSource private ( private val path: String, private val fieldNames: Array[String], private val fieldTypes: Array[TypeInformation[_]], -private val fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, -private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, -private val quoteCharacter: Character = null, -private val ignoreFirstLine: Boolean = false, -private val ignoreComments: String = null, -private val lenient: Boolean = false) +private val selectedFields: Array[Int], +private val fieldDelim: String, +private val rowDelim: String, +private val quoteCharacter: Character, +private val ignoreFirstLine: Boolean, +private val ignoreComments: String, +private val lenient: Boolean) extends BatchTableSource[Row] --- End diff -- Maybe we need a base class instead of traits to do something like checking the equality of numbers of field names/types. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224024#comment-16224024 ] ASF GitHub Bot commented on FLINK-7548: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147583120 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.expressions.{Expression, ResolvedFieldReference} + +/** + * The [[FieldComputer]] interface returns an expression to compute the field of the table schema + * of a [[TableSource]] from one or more fields of the [[TableSource]]'s return type. + * + * @tparam T The result type of the provided expression. + */ +abstract class FieldComputer[T] { --- End diff -- I think we could add a `getArguments: Array[_]` function to allow providing extra arguments besides the existing fields, e.g., for situations in which event-time is represented as `String`, the argument should be the time pattern. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224026#comment-16224026 ] ASF GitHub Bot commented on FLINK-7548: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147580191 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -56,33 +63,123 @@ class StreamTableSourceScan( cluster, traitSet, getTable, - tableSource + tableSource, + selectedFields ) } override def copy( traitSet: RelTraitSet, - newTableSource: TableSource[_]) -: PhysicalTableSourceScan = { + newTableSource: TableSource[_]): PhysicalTableSourceScan = { new StreamTableSourceScan( cluster, traitSet, getTable, - newTableSource.asInstanceOf[StreamTableSource[_]] + newTableSource.asInstanceOf[StreamTableSource[_]], + selectedFields ) } override def translateToPlan( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { +val fieldIndexes = TableSourceUtil.computeIndexMapping( + tableSource, + isStreamTable = true, + selectedFields) + val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] -convertToInternalRow( - new RowSchema(getRowType), +val outputSchema = new RowSchema(this.getRowType) + +// check that declared and actual type of table source DataStream are identical +if (inputDataStream.getType != tableSource.getReturnType) { + throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + +s"returned a DataStream of type ${inputDataStream.getType} that does not match with the " + +s"type ${tableSource.getReturnType} declared by the TableSource.getReturnType() method. " + +s"Please validate the implementation of the TableSource.") +} + +// get expression to extract rowtime attribute +val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + tableSource, + selectedFields, + cluster, + tableEnv.getRelBuilder, + TimeIndicatorTypeInfo.ROWTIME_INDICATOR +) + +// ingest table and convert and extract time attributes if necessary +val ingestedTable = convertToInternalRow( + outputSchema, inputDataStream, - new StreamTableSourceTable(tableSource), - config) + fieldIndexes, + config, + rowtimeExpression) + +// generate watermarks for rowtime indicator +val rowtimeDesc: Option[RowtimeAttributeDescriptor] = + TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, selectedFields) + +val withWatermarks = if (rowtimeDesc.isDefined) { + val rowtimeFieldIdx = outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName) + val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy + watermarkStrategy match { +case p: PeriodicWatermarkAssigner => + val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) +case p: PunctuatedWatermarkAssigner => + val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + } +} else { + // No need to generate watermarks if no rowtime attribute is specified. + ingestedTable +} + +withWatermarks + } +} + +/** + * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]]. + * + * @param timeFieldIdx the index of the rowtime attribute. + * @param assigner the watermark assigner. + */ +private class PeriodicWatermarkAssignerWrapper( +timeFieldIdx: Int, +assigner: PeriodicWatermarkAssigner) + extends AssignerWithPeriodicWatermarks[CRow] { + + override def getCurrentWatermark: Watermark = assigner.getWatermark + + override def extractTimestamp(crow: CRow, previousElementTimestamp: Long): Long = { +val timestamp: Long = crow.row.getField(timeFieldIdx).asInstanceOf[Long] +assigner.nextTimestamp(timestamp) +0L --- End diff -- I know the timestamp in the `StreamRecord` is useless for the Table/SQL API
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223885#comment-16223885 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4894 I'm planning to merge this PR before the feature freeze on Tuesday. Cheers, Fabian > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222519#comment-16222519 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4894 Added documentation for the new TableSource interfaces. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222013#comment-16222013 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4894 Thanks for the feedback @twalthr. I pushed an update. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221986#comment-16221986 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147358782 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -84,6 +86,13 @@ return typeInfo; } + @Override + public TableSchema getTableSchema() { + return new TableSchema( + ((RowTypeInfo) typeInfo).getFieldNames(), --- End diff -- What do you think about adding a `fromTypeInfo` method to the companion object that creates a `TableSchema`? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221869#comment-16221869 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147339907 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala --- @@ -18,55 +18,7 @@ package org.apache.flink.table.plan.nodes -import org.apache.flink.api.common.functions.Function -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} -import org.apache.flink.types.Row - /** * Common class for batch and stream scans. */ -trait CommonScan[T] { - - /** -* We check if the input type is exactly the same as the internal row type. -* A conversion is necessary if types differ. -*/ - private[flink] def needsConversion( - externalTypeInfo: TypeInformation[Any], - internalTypeInfo: TypeInformation[T]): Boolean = -externalTypeInfo != internalTypeInfo - - private[flink] def generatedConversionFunction[F <: Function]( - config: TableConfig, - functionClass: Class[F], - inputType: TypeInformation[Any], - expectedType: TypeInformation[Row], - conversionOperatorName: String, - fieldNames: Seq[String], - inputFieldMapping: Option[Array[Int]] = None) -: GeneratedFunction[F, Row] = { - -val generator = new FunctionCodeGenerator( - config, - false, - inputType, - None, - inputFieldMapping) -val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) - -val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - -generator.generateFunction( - conversionOperatorName, - functionClass, - body, - expectedType) - } - -} +trait CommonScan[T] --- End diff -- I kept it for consistency. The other operators have a CommonX trait as well. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221867#comment-16221867 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147339820 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1332,6 +1322,11 @@ abstract class CodeGenerator( GeneratedExpression(resultTerm, NEVER_NULL, resultCode, SqlTimeTypeInfo.TIMESTAMP) } + private[flink] def generateCurrentTimestamp(): GeneratedExpression = { +val rexBuilder = new RexBuilder(new FlinkTypeFactory(new FlinkTypeSystem)) +generateExpression(rexBuilder.makeCall(CURRENT_TIMESTAMP)) --- End diff -- Ah, thanks. I was looking for something like that. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221865#comment-16221865 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147339574 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -124,9 +124,15 @@ object ExternalTableSourceUtil extends Logging { } else { FlinkStatistic.UNKNOWN } + convertedTableSource match { -case s : StreamTableSource[_] => new StreamTableSourceTable(s, flinkStatistic) -case _ => new TableSourceTable(convertedTableSource, flinkStatistic) +case s: StreamTableSource[_] => --- End diff -- In that case a `StreamTableSourceTable` is registered. I kept the previous behavior. We can fix it in a follow up issue and make the decision based on the execution environment. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221812#comment-16221812 ] ASF GitHub Bot commented on FLINK-7548: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r146585641 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1332,6 +1322,11 @@ abstract class CodeGenerator( GeneratedExpression(resultTerm, NEVER_NULL, resultCode, SqlTimeTypeInfo.TIMESTAMP) } + private[flink] def generateCurrentTimestamp(): GeneratedExpression = { +val rexBuilder = new RexBuilder(new FlinkTypeFactory(new FlinkTypeSystem)) +generateExpression(rexBuilder.makeCall(CURRENT_TIMESTAMP)) --- End diff -- Use `new CurrentTimePointCallGen().generate()` instead of create a whole new type environment ;) > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221813#comment-16221813 ] ASF GitHub Bot commented on FLINK-7548: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r146575110 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -89,34 +98,50 @@ public void addColumn(String family, String qualifier, Class clazz) { * @param charset Name of the charset to use. */ public void setCharset(String charset) { - this.schema.setCharset(charset); + this.hBaseSchema.setCharset(charset); } @Override public TypeInformation getReturnType() { - String[] famNames = schema.getFamilyNames(); - TypeInformation[] typeInfos = new TypeInformation[famNames.length]; + return new RowTypeInfo(getFieldTypes(), getFieldNames()); + } + + public TableSchema getTableSchema() { --- End diff -- Add `@Override` annotation. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221811#comment-16221811 ] ASF GitHub Bot commented on FLINK-7548: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r146583860 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -246,40 +245,31 @@ abstract class CodeGenerator( */ def generateConverterResultExpression( returnType: TypeInformation[_ <: Any], - resultFieldNames: Seq[String]) + resultFieldNames: Seq[String], + rowtimeExpression: Option[RexNode] = None) --- End diff -- Add the Scala doc for the parameter. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221808#comment-16221808 ] ASF GitHub Bot commented on FLINK-7548: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r146576347 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala --- @@ -50,6 +50,11 @@ class TableSchema( val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap + /** Returns a copy of the TableSchema */ + def copy: TableSchema = { --- End diff -- Maybe mention the deep copy behavior. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221805#comment-16221805 ] ASF GitHub Bot commented on FLINK-7548: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r146587166 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala --- @@ -22,24 +22,30 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet} import org.apache.calcite.rel.RelWriter import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.sources.TableSource +import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} +import org.apache.flink.table.sources.{TableSource, TableSourceUtil} import scala.collection.JavaConverters._ abstract class PhysicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, -val tableSource: TableSource[_]) +val tableSource: TableSource[_], +val selectedFields: Option[Array[Int]]) extends TableScan(cluster, traitSet, table) { override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] -flinkTypeFactory.buildLogicalRowType( - TableEnvironment.getFieldNames(tableSource), - TableEnvironment.getFieldTypes(tableSource.getReturnType)) +val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match { + case _: StreamTableSourceTable[_] => true + case _: BatchTableSourceTable[_] => false + case t => throw TableException(s"Unknown Table type ${t.getClass}.") +} + +TableSourceUtil.getTableSchema(tableSource, selectedFields, streamingTable, flinkTypeFactory) --- End diff -- I would rename this method to `getRelDataType`. Because it does not return our `TableSchema`. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221810#comment-16221810 ] ASF GitHub Bot commented on FLINK-7548: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r146582532 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -124,9 +124,15 @@ object ExternalTableSourceUtil extends Logging { } else { FlinkStatistic.UNKNOWN } + convertedTableSource match { -case s : StreamTableSource[_] => new StreamTableSourceTable(s, flinkStatistic) -case _ => new TableSourceTable(convertedTableSource, flinkStatistic) +case s: StreamTableSource[_] => --- End diff -- What happens if the table source implements both interfaces? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221807#comment-16221807 ] ASF GitHub Bot commented on FLINK-7548: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r146584051 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -246,40 +245,31 @@ abstract class CodeGenerator( */ def generateConverterResultExpression( returnType: TypeInformation[_ <: Any], - resultFieldNames: Seq[String]) + resultFieldNames: Seq[String], + rowtimeExpression: Option[RexNode] = None) : GeneratedExpression = { val input1AccessExprs = input1Mapping.map { - case TimeIndicatorTypeInfo.ROWTIME_MARKER => -// attribute is a rowtime indicator. Access event-time timestamp in StreamRecord. -generateRowtimeAccess() - case TimeIndicatorTypeInfo.PROCTIME_MARKER => + case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER | + TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER => +// attribute is a rowtime indicator. +if (rowtimeExpression.isDefined) { --- End diff -- We could use pattern matching here. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221806#comment-16221806 ] ASF GitHub Bot commented on FLINK-7548: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r146574319 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -84,6 +86,13 @@ return typeInfo; } + @Override + public TableSchema getTableSchema() { + return new TableSchema( + ((RowTypeInfo) typeInfo).getFieldNames(), --- End diff -- Maybe we should add a constructor or method to `TableSchema` that takes a TypeInfo. I think this case is very common for table sources. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221809#comment-16221809 ] ASF GitHub Bot commented on FLINK-7548: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r146586028 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala --- @@ -18,55 +18,7 @@ package org.apache.flink.table.plan.nodes -import org.apache.flink.api.common.functions.Function -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} -import org.apache.flink.types.Row - /** * Common class for batch and stream scans. */ -trait CommonScan[T] { - - /** -* We check if the input type is exactly the same as the internal row type. -* A conversion is necessary if types differ. -*/ - private[flink] def needsConversion( - externalTypeInfo: TypeInformation[Any], - internalTypeInfo: TypeInformation[T]): Boolean = -externalTypeInfo != internalTypeInfo - - private[flink] def generatedConversionFunction[F <: Function]( - config: TableConfig, - functionClass: Class[F], - inputType: TypeInformation[Any], - expectedType: TypeInformation[Row], - conversionOperatorName: String, - fieldNames: Seq[String], - inputFieldMapping: Option[Array[Int]] = None) -: GeneratedFunction[F, Row] = { - -val generator = new FunctionCodeGenerator( - config, - false, - inputType, - None, - inputFieldMapping) -val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) - -val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - -generator.generateFunction( - conversionOperatorName, - functionClass, - body, - expectedType) - } - -} +trait CommonScan[T] --- End diff -- Do we still need this trait if it does not contain anything? Let's keep it simple and remove it for now. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216777#comment-16216777 ] ASF GitHub Bot commented on FLINK-7548: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/4894 [FLINK-7548] [table] Improve rowtime support of TableSources. ## What is the purpose of the change This PR refactors the `TableSource` interface and some related interfaces. The goal of the refactoring is to: 1. Move timestamp extraction and watermark assignment into the table scan. This way it is under control of the Table API and does not rely extraction and assignment in the `TableSource` 2. Unify time attribute handling in `BatchTableSource` and `StreamTableSource`. 3. Projection push down for table sources with time attributes. 4. Projection push down into table access. 5. Support of time attributes for table source with atomic return type. This commit also resolves: - FLINK-6870: Unified handling of time attributes in batch and streaming - FLINK-7179: Support for projection push down and watermark assigners - FLINK-7696: Support for projection push down and time attributes ## Brief change log ### API changes * Added `getTableSchema()` method to `TableSource` to separate logical table schema from physical return type (`getReturnType()`). * Replaced `DefinedFieldNames` interface by `DefinedFieldMapping` interface. Field names can be defined by table schema. The `DefinedFieldMapping` interfaces allows to provide an explicit mapping from table schema fields to return type fields. * `DefinedRowtimeAttribute` interface was replaced by `DefinedRowtimeAttributes` interface. The new interface returns a list of `RowtimeAttributeDescriptor` which exposes a `TimestampExtractor` and `WatermarkStrategy`. Both are used internally to extract timestamps and assign watermarks during the table scan. There are default implementation for common timestamp extraction and watermark generation methods. ## Verifying this change This PR adds an extensive set of validation, plan, and end-to-end integration tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **YES** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** ## Documentation - Does this pull request introduce a new feature? **YES** - If yes, how is the feature documented? **not yet**, PR will be updated soon. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableWatermarks Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4894.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4894 commit 5a3b0e02b58b0b5eeece60514b9affaf738c10b5 Author: Fabian HueskeDate: 2017-10-05T15:44:35Z [FLINK-7548] [table] Improve rowtime support of TableSources. This commit also resolves: - FLINK-6870: Unified handling of time attributes in batch and streaming - FLINK-7179: Support for projection push down and watermark assigners - FLINK-7696: Support for projection push down and time attributes TODO: - update documentation > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215805#comment-16215805 ] Fabian Hueske commented on FLINK-7548: -- Thanks for the feedback [~wheat9]. I've addressed the issue of projection push down in my WIP branch and think I've found a good solution. A {{ProjectableTableSource}} only needs to adjust the {{TypeInformation}} and the {{DataStream}} of {{DataSet}}. The {{TableSchema}} does not need to be adapted. I've also extended the support for projection push down by pushing projection into a table scan even if the {{TableSource}} does not implement {{ProjectableTableSource}}. In that case, the projection is applied by the initial {{Row}} conversion such that a projecting Calc can be removed if only selects or reorders fields. It would also be possible to push expressions into the scan, but that should be a follow up issue. I'll open a PR in the next hours. It will be quite large (~2500 LOC changes) but it would be great if I could get some feedback, especially for the API facing classes. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215758#comment-16215758 ] Haohui Mai commented on FLINK-7548: --- Sorry for the late response. The APIs should work for our use cases as long as the timestamps can be extracted through an expression. I think [~ykt836] brought up a good point -- it might be tricky to implement projection push down in this case. What would be our strategies there? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209327#comment-16209327 ] Kurt Young commented on FLINK-7548: --- Actually I was thinking that when projections are pushed down, we should keep the TableSource's schema unchanged. For example, when we want to read a csv file, the schema of the file is somewhat fixed. Whether we want to read all fields or some selected fields from that file will not affect the schema of the table. I think we should keep the schema fixed and change the result type after projection push down. But this depends on how we see or define what the {{schema}} of a {{TableSource}}. If the schema changes with field projections, it makes no differences with current result type inference. What do you think? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209299#comment-16209299 ] Fabian Hueske commented on FLINK-7548: -- Thanks for the comment [~ykt836]. I think the projection push-down behavior is similar as before. When a projection is pushed down, a {{TableSource}} is not modified but we create a new {{TableSource}} with reduced schema. This is what happens at the moment as well. The difference is that right now the schema is inferred by the Table API / SQL internals from the return type and with my change the schema would be explicitly given by the {{TableSource}}. This makes it easier to support projection push-down also for {{TableSource}} implementations with time fields because these would no longer be automatically merged into the schema by the Table API internals but instead the {{TableSource}} knows everything about its schema. Nonetheless, you are right. When projections are pushed down, a {{TableSource}} needs generate a new return type and new schema for the new {{TableSource}} (and possibly adjust the rowtime / proctime field information). > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209188#comment-16209188 ] Kurt Young commented on FLINK-7548: --- Hi [~fhueske], sorry for the late response. I'm not very familiar with streaming related stuffs, but I like the idea to separate the schema and actual returned type, it makes the concepts more clear. One question here, do you think table schema should be remain immutable once we defined it. Like a CsvTableSource, IMO the schema should be fixed once we decide which file to read. But according to your WIP branch, if project pushed down into CsvTableSource, it also changes the table schema. I'm not sure this is correct. What do you think? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209002#comment-16209002 ] Till Rohrmann commented on FLINK-7548: -- Is this strictly a blocker for 1.4? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16208238#comment-16208238 ] Fabian Hueske commented on FLINK-7548: -- Thanks for the comments [~twalthr] and [~xccui]. @ Timo: I would not add {{TimestampExtractor}} and {{WatermarkAssigner}} to {{TableSchema}}. {{TableSchema}} is also used at different places and timestamp extractors & watermarks assigners are not relevant in these contexts. IMO, {{TimestampExtractor}} and {{WatermarkAssigner}} belong to a {{TableSource}} but not to the schema. The schema could encode that these fields are non-nullable and watermarked (or however, we want to call this property). I think the issue of defining fields twice is rather an issue of a concrete {{TableSource}} implementation but not so much of the internal API. Given the current API, it is possible to have a {{TableSource}} that requires to define a field just once (in whatever way) and generate an appropriate TableSchema and proctime field name. @ Xingcan: 1. With the current design it shouldn't be too hard to define a {{TimestampExtractor}} that parses a string field. I would not add it to the {{ExistingField}} extractor because it would need an additional parameter that specifies the formatting of the timestamp string. 2. Multiple rowtime attributes might not be possible yet but might be later. With this API we have the option to add this feature later without breaking the API. 3. It's also possible to add a {{TimestampExtractor}} that assigns the current timestamp as event timestamp, i.e., ingestion time. 4. Yes, that was my initial motivation to use RexNode instead of Table API Expressions. I was thinking that it would be easier to parse SQL expressions from a CREATE TABLE statement into RexNodes (using some Calcite code) than to Expressions. OTOH, I think it makes sense to be consistent and not expose Calcite API. I'd be happy to hear what others think about the RexNode vs. Expression choice. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207782#comment-16207782 ] Xingcan Cui commented on FLINK-7548: Hi [~fhueske], sorry for the late reply. I'll first share some ideas that struck me and then keep the focus on this issue. # Now that the rowtime can be extracted by expressions, shall we consider allowing more data types for the original fields? For instance, the users may want to extract the timestamps from a {{String}} field or even from multiple fields by a UDF. # Since the stream API do not support multiple watermarks, maybe it's not necessary to define a list of {{RowtimeAttributeDescriptor}}. # Sometimes the processing time could be inapplicable, while the streams are lack of timestamps. I think it would be convenient to provide an inner mechanism, which can materialize the ingestion time as the rowtime. # Using Table API Expression sounds like a good idea. However, I just wonder whether it will make the DDL harder to implement. Best, Xingcan > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207717#comment-16207717 ] Timo Walther commented on FLINK-7548: - Thanks for the proposal [~fhueske]. I like the idea with a separate {{TableSchema}} for sources that a user can implement. Maybe we can simplify the design even more by extending {{TableSchema}} with additional methods like {{setRowtime(TimestampExtractor, WatermarkAssigner), setProctime()}}. Then, we would not need the {{DefinedProctimeAttribute}} and {{DefinedRowtimeAttributes}} interfaces anymore. If we want to change the logic in the future, we don't have to modify interfaces but can deprecate or add additional methods to {{TableSchema}}. Having {{setRowtime/setProctime()}} would also hide the internal {{TimeIndicatorType}} but would not require to define a field name twice (in TableSchema and DefinedProctimeAttribute). What do you think? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207628#comment-16207628 ] Fabian Hueske commented on FLINK-7548: -- Hi [~jark], [~wheat9], [~twalthr], [~xccui], [~ykt836],I'd really like your feedback on this effort. As recently discussed on the dev mailing list, the plan is to cut the 1.4.0 release branch in two weeks and I think this change should go in. In the meantime, I thought about the interface and I'll do some refinements: - Time indicator attributes will be of type Types.SQL_TIMESTAMP in the TableSource (instead of TimeIndicatorType). The reason is that 1) they are exposed as TIMESTAMP in the schema 2) this hides the internal representation from the user. - I add a DefinedProctimeAttribute interface that returns the name of a processing time stamp, i.e., the field must exist in the TableSchema and will be used as processing time indicator - The DefinedRowtimeAttributes will be changed to return a list of RowtimeAttributeDescriptors. The field names of the descriptors are matched to the TableSchema and these fields must be of type Timestamp. - The rowtime extraction expression will be a Flink Table API Expression instead of a Calcite RexNode. Until now we have hidden Calcite interfaces from the users. - The BatchTableSourceScan will be adapted to also extract rowtime attributes and assign the current timestamp to processing time attributes. This way, we have a unified handling if a TableSource implements BatchTableSource and StreamTableSource. Please let me know what you think. Thanks, Fabian > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197673#comment-16197673 ] Fabian Hueske commented on FLINK-7548: -- I've created a proposal for a reworked TableSource API that solves several issues. I've pushed a WIP branch to my repository: https://github.com/fhueske/flink/tree/tableWatermarks-extended The main changes are as follows: - The {{TableSource}} interface gets a new method {{getTableSchema(): TableSchema}} which returns the schema with all fields of the resulting table. For streaming tables, this includes also rowtime indicators. These fields have the corresponding {{TimeIndicatorTypeInformation}} types. This means, that the {{TypeInformation}} returned by {{getResultType()}} does no longer determine the schema and fields of the table. Hence, we need to perform a mapping from the physical input type ({{getResultType()}} and the logical table schema {{getTableSchema()}}. By default, this mapping is done based on field names, i.e., a table schema field is mapped to the field of the same name in the physical input type. If we cannot find a physical field for a logical field or if the types do not match, the {{TableSource}} is rejected. This default behavior resembles the current behavior. If fields should not be automatically mapped by name, we will allow to specify a manual index based mapping (similar to the previous {{DefinedFieldNames}} interface (note: this is not yet included in the WIP branch). - Processing time fields (i.e., fields in the table schema with type {{TimeIndicatorTypeInformation(false)}}) are automatically inserted into the schema during the initial conversion. - A {{StreamTableSource}} with rowtime fields (i.e., fields in the table schema with type {{TimeIndicatorTypeInformation(true)}}) requires an additional interface {{DefinedRowtimeAttributes}}. The interface provides a {{RowtimeAttributeDescriptor}} for each rowtime field (right now only a single rowtime field is supported but we are prepared for more). The {{RowtimeAttributeDescriptor}} provides a {{TimestampExtractor}} which gives a {{RexNode}} expression to extract the timestamp field from the input type. A corresponding expression is code-gen'd into the initial conversion and executed by the table scan operator. Moreover, {{RowtimeAttributeDescriptor}} gives a {{WatermarkStrategy}} which is used to generate watermarks for a rowtime attribute. Watermarks are also generated by the table scan operator. - We can provided built-in implementations for {{TimestampExtractor}} (right now only {{ExistingField}} to convert a {{Long}} or {{Timestamp}} attributes into a rowtime attribute) and {{WatermarkStrategy}} (right now {{AscendingWatermarks}} and {{BoundedOutOfOrderWatermarks}}). Additional implementations can be added in later PRs. I think the proposal is a good solution because: - the table schema is explicitly defined by the {{TableSource}} and not by the API internals as before (handling of time attributes is currently hidden). This will make {{CREATE TABLE}} DDL statements easier. - timestamp extraction and watermark assignment can be configured by extensible interfaces without changing the API internals. The actual timestamp extraction and watermark generation happen in the table scan operator, so the {{TableSource}} does not have to deal with it. - Timestamp extraction happens with via {{RexNode}} expressions. So this is very versatile and it should be possible to call UDFs if these had been registered before (need to check this though). - We can provide built-in implementations for the most common strategies. I think all requirements that we had listed in this issue before, should be solvable with this design. - Projection push-down can be enabled for table sources with timestamp attributes which doesn't work right now (not part of the WIP branch) - I could remove a few weird artifacts / workarounds of the current design that changes the tables schema internally. Of course, there are also a downside: - We are touching the {{TableSource}} interface and related interfaces. This is not a big deal for Flink's built-in sources (because there are not so many), but might be for users that have more {{TableSource}}s implemented. Please let me know what you think [~jark], [~wheat9], [~twalthr], [~xccui], [~ykt836], and everybody else. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. >
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181847#comment-16181847 ] Xingcan Cui commented on FLINK-7548: Thanks for the explanation, [~fhueske]. I was too paranoid about the problem. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181234#comment-16181234 ] Fabian Hueske commented on FLINK-7548: -- I think there might be a misunderstanding of the {{TimestampsAndPeriodicWatermarksOperator}}. bq. About my last question, I actually refer to the TimestampsAndPeriodicWatermarksOperator. Here, the "periodic" refers to proctime. Considering the time systems for the rowtime and the proctime may not be synchronized (i.e., they get different speeds), could we consider providing a "rowtime periodic" assigner? The assigner does not emit watermarks that have a processing time _value_ but only in regular time intervals that are based on processing time. So, whenever the interval has passed, the operator asks the {{AssignerWithPeriodicWatermarks}} to return the current watermark. The watermark should based on the event-time timestamps that the {{AssignerWithPeriodicWatermarks}} observed (in fact this depends on the implementation of the assigner). So the value of the watermarks is based on event-time but the interval in which the watermarks are generated is not. AFAIK, this is the most commonly used watermark generation strategy. So it seems to suit many users. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180077#comment-16180077 ] Xingcan Cui commented on FLINK-7548: Thanks for the answers, [~fhueske]. Some extra comments. About the first question, we must admit that a rowtime field is dualistic. On one hand, it represents the rowtime and should be taken as the {{Long}} type. On the other hand, it is a common field that gets its own type ({{Timestamp}} or {{Long}} and maybe more in the future?). We don't want to perform the extra type judgement when using this field as the rowtime field and also don't want to lose the original data type when using it as a common field (e.g., be passed to a UDF which formats a timestamp). Of course, if all the types are internally represented as {{Long}}, we just give the fields different time indicators so that we could recover the original data type after the processing. The record number bounded out-of-order generation strategy is something like we don't emit a watermark {{w}} until a specified number of records whose timestamps are greater than {{w}} have reached. Just an idea that hits me :P About my last question, I actually refer to the {{TimestampsAndPeriodicWatermarksOperator}}. Here, the "periodic" refers to proctime. Considering the time systems for the rowtime and the proctime may not be synchronized (i.e., they get different speeds), could we consider providing a "rowtime periodic" assigner? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179836#comment-16179836 ] Fabian Hueske commented on FLINK-7548: -- Thanks for your thoughts [~xccui]. I added a few comments to your suggestions / questions. Thanks, Fabian bq. Considering that the data type should be preserved, it may bring extra logic if we do that internally. To keep the consistency, I wonder if it's possible to encapsulate the time into a new Rowtime type. It exposes two methods, getTime(): Long for logical level use and getValue(): T for physical level use. In fact, {{Long}} and {{Timestamp}} have the same internal representation, namely {{Long}}. The issue is more the type that is exposed to SQL or the Table API. We would need a new TimeIndicator type that exposes a timestamp as {{Long}}. bq. Besides, I think the watermark generation should not be bound with rowtime extraction. Compared with implementing them in a single scan operator (not sure if I understood correctly), I prefer to generate watermarks in extra operators. That should be more flexible. Timestamp extraction and watermark generation would not be tight together. First, we would compute timestamps (only necessary if we don't use an existing field). The next step would extract watermarks. However, both operations would happen in the logical scan operator because a single operator can be translated into multiple DataStream operations. bq. I am thinking of a new record number bounded out-of-order generation strategy. Do you think it will be useful in real applications? How would this strategy work? IMO, built-in strategies should have a concrete use case in mind which is common enough to justify a built-in primitive. bq. I still feel that the machine time is not compatible with the rowtime watermark generation. Shall we consider getting rid of it? Machine time (assuming that you refer to processing time here) does not use watermarks. Watermarks are only used for event-time processing. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176020#comment-16176020 ] Xingcan Cui commented on FLINK-7548: Hi [~fhueske] and [~jark], sorry for the late reply. Honestly, I don't have much experience in real application, but I think the proposal is quite reasonable. In brief, any (existing or computed) field with the type {{LONG}} or {{TIMESTAMP}} can be taken as a rowtime field, right? Here are my thoughts: 1. Considering that the data type should be preserved, it may bring extra logic if we do that internally. To keep the consistency, I wonder if it's possible to encapsulate the time into a new {{Rowtime}} type. It exposes two methods, {{getTime(): Long}} for logical level use and {{getValue(): T}} for physical level use. 2. Besides, I think the watermark generation should not be bound with rowtime extraction. Compared with implementing them in a single scan operator (not sure if I understood correctly), I prefer to generate watermarks in extra operators. That should be more flexible. 3. I am thinking of a new record number bounded out-of-order generation strategy. Do you think it will be useful in real applications? 4. I still feel that the machine time is not compatible with the rowtime watermark generation. Shall we consider getting rid of it? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174581#comment-16174581 ] Fabian Hueske commented on FLINK-7548: -- Thanks for the feedback [~jark]! bq. Regarding to the existing field type, should we support Timestamp type? Timestamp is a very common type in traditional database. You mean to support {{TIMESTAMP}} in addition to {{LONG}} as existing timestamp column? I think that's a good idea. I will address that in FLINK-7446. bq. What about to make TableSource support "Computed Column" instead of UDF converter? Initially, I was also thinking about arbitrary expressions for column conversions. The "problem" would be that {{TableSource}} (and the {{DefinedRowtimeAttribute}} interface) are shared among Table API and SQL, but Table API and SQL expressions are different. The easiest would be to use the Table API expression parser but this would be inconsistent with SQL. I proposed using a UDF because this would avoid the problem at the cost of implementing a UDF for each conversion. I'm definitely open for suggestions here. bq. I agree to preserve the type. Yes, I totally see that. I'd solve this in a follow up issue because I think we'd need a different time indicator type for that and some changes would be more involved. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171634#comment-16171634 ] Jark Wu commented on FLINK-7548: Hi [~fhueske], thanks for the proposal. I really like it ! It meets most of our requirement. I have some additional thoughts: Regarding to the existing field type, should we support {{Timestamp}} type? Timestamp is a very common type in traditional database. The computed timestamp is a very useful feature. We implemented it in Blink SQL which is called "Computed Column", it is a new field which is computed based on one or more existing fields. The computed column can be used as timestamp field but also a normal field. The computed column is computed using an expression (including UDF). What about to make TableSource support "Computed Column" instead of UDF converter? {quote} preserve type of time indicator fields. At the moment, a field of type ROWTIME_INDICATOR is exposed to the user as TIMESTAMP. However, a time field is initially be of type LONG such that the type exposed to the user is changed. We might want to add a time indicator that exposes itself as LONG. {quote} I agree to preserve the type. It is very strange that a long type field becomes {{Timestamp}} type when treat the field as timestamp field. What do you think about this [~fhueske], [~xccui], [~wheat9] ? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171469#comment-16171469 ] Fabian Hueske commented on FLINK-7548: -- At the moment, we expect a {{TableSource}} to assign timestamps and watermarks itself. The goal for this JIRA should be to move the timestamp and watermark generation into the Scan operator. So the Scan operator would receive the {{DataStream}} from the {{TableSource}} that has no ({{StreamRecord}}) timestamps and watermarks and generate watermarks (and timestamps) based on that. For this we need to provide information about the timestamp field and the generation of watermarks. I collected a few requirements from discussions I had: 1. the timestamp field can be defined as one of the following: - An existing field of type {{LONG}}. The type of the existing field is converted to {{ROWTIME_INDICATOR}} (this conversion is only on the logical level, as timestamps are internally handled as LONG). - An existing field that must be converted using a scalar UDF with a single input parameter (of type of the existing field) that returns a {{LONG}}. The input field is replaced by the result of the UDF. The name of the input field is preserved and the logical type of the field becomes {{ROWTIME_INDICATOR}}. - A new field that is computed using a scalar UDF with one or more input parameters. The result of the UDF is of type {{LONG}} and appended as a new field to the schema of the row. The name of the new field must be provided and the logical type is {{ROWTIME_INDICATOR}}. 2. the watermarks are computed based on the (computed) timestamp field. - There are some common watermark strategies that should be supported: -- (periodic) ascending watermarks -- (periodic) bounded out-of-order watermarks - Custom watermark logic should be supported as well -- periodic watermarks -- punctuated watermarks Additional thoughts / future requirements / open questions: - preserve type of time indicator fields. At the moment, a field of type {{ROWTIME_INDICATOR}} is exposed to the user as {{TIMESTAMP}}. However, a time field is initially be of type {{LONG}} such that the type exposed to the user is changed. We might want to add a time indicator that exposes itself as {{LONG}}. - do we need support for time indicator fields in nested fields (with replacement). What do you think of this [~wheat9], [~jark], [~xccui]? Would that meet your requirement? Once we agree on the requirements (and scope) of this issue, we can continue to define the interface. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144836#comment-16144836 ] Xingcan Cui commented on FLINK-7548: Thanks for the issue [~jark]. I'd like to share some ideas about the problem. 1. In principle, each rowtime field should be "guarded" with a set of watermarks. Although we support multiple rowtime fields now, once two streams are connected, their watermarks will be forcibly merged. As a consequence, the initial watermarks may not be used in the following calculations. Shall we consider re-generating them? 2. The current periodic watermark assigner is based on machine time. I'm not sure if it is applicable for rowtime since the rowtime and machine time may not be synchronized. For example, if the stream is sourced from a historical queue, it may feed into the system at a maximum speed, thus the machine time based watermark assigner may not work properly (e.g., we may generate a watermark with 1 hour rowtime span in 5 seconds). How about using a rowtime based periodic assigner with the following framework? {code:java} class WatermarksAssigner(interval: Long) extends AssignerWithPunctuatedWatermarks[Order] { var currentWatermark: Long = 0 override def extractTimestamp(element: Order, previousElementTimestamp: Long): Long = { element.rt } override def checkAndGetNextWatermark(lastElement: Order, extractedTimestamp: Long): Watermark = { if (currentWatermark >= lastWatermark + interval) { currentWatermark = currentWatermark + ((extractedTimestamp - lastWatermarks) / interval) * interval new Watermark(currentWatermark) } else { null } } } {code} BTW, I'm quite interested in this issue. Can I take it? Thanks, Xingcan > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)