[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138053#comment-16138053 ] ASF GitHub Bot commented on FLINK-7337: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4532 > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138052#comment-16138052 ] ASF GitHub Bot commented on FLINK-7337: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4488 > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138039#comment-16138039 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4532 Merging... > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126865#comment-16126865 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4532 I'm +1 to merge this. I have create two followup issue, we can move the discussion under the JIRA: (1) FLINK-7446 Support to define an existing field as the rowtime field for TableSource (2) FLINK-7448 Keep the data type unchanged when register an existing field as rowtime > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126721#comment-16126721 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r133104586 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -46,29 +47,29 @@ class StreamTableSourceScan( val fieldNames = TableEnvironment.getFieldNames(tableSource).toList val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList -val fieldCnt = fieldNames.length +val fields = fieldNames.zip(fieldTypes) -val rowtime = tableSource match { +val withRowtime = tableSource match { case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => --- End diff -- Sure, I have logged it https://issues.apache.org/jira/browse/FLINK-7446 > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126263#comment-16126263 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r133037759 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- What would happen if the left most table does not have a time attribute (or if it is projected out)? I just think that the semantics of the `StreamRecord` timestamps are too important to have an implicit behavior that is hard to explain and reason about for users. IMO, an exception that asks for explicit user input is the better choice compared to a behavior that depends on non-obvious query characteristics and is hard to predict. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125817#comment-16125817 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4532 @wuchong @fhueske I hope I addressed all code related issues. Is it ok to merge this for now? I will create a follow up issue for the Table to DataStream/TableSink conversion case. > whether we should change the rowtime type when it is an existing field I think this is a very special case. But it is just a nice addition to make the user's life easier. We could also remove the replacing feature as a whole to avoid confusion due to the data type conversion. In general, we should get rid of `TIMESTAMP` and work on longs as much as possible. In the near future, we might also extend the API to use Java 8 `java.time.` equivalents. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125801#comment-16125801 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r132974743 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -46,29 +47,29 @@ class StreamTableSourceScan( val fieldNames = TableEnvironment.getFieldNames(tableSource).toList val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList -val fieldCnt = fieldNames.length +val fields = fieldNames.zip(fieldTypes) -val rowtime = tableSource match { +val withRowtime = tableSource match { case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => --- End diff -- Can you open an issue for this? We can discuss this after merging this PR. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125788#comment-16125788 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132971979 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- @fhueske With left most table I mean the first time indicator in the select statement (from left). I think even join reordering does not change the column ordering. I agree that at least `TableSink`s should do deal with it implicitly. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125340#comment-16125340 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132889590 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- @wuchong let's rework the restrictions for a `TableSink` in follow up issue. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125338#comment-16125338 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132889039 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException("Field name can not be '*'.") } -(fieldNames.toArray, fieldIndexes.toArray) +(fieldNames.toArray, fieldIndexes.toArray) // build fails if not converted to array --- End diff -- See https://travis-ci.org/apache/flink/jobs/263806347 It only happens when using Scala 2.10. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125142#comment-16125142 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132863952 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala --- @@ -22,12 +22,12 @@ import org.apache.calcite.sql.`type`._ import org.apache.calcite.sql.validate.SqlMonotonicity /** - * Function that materializes a time attribute to the metadata timestamp. After materialization - * the result can be used in regular arithmetical calculations. + * Function that materializes a processing time attribute. + * After materialization the result can be used in regular arithmetical calculations. */ -object TimeMaterializationSqlFunction +object ProctimeSqlFunction --- End diff -- Should we move this object to `org.apache.flink.table.functions.sql` package? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125061#comment-16125061 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132854814 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.util.Collector + +/** + * Wraps a ProcessFunction and sets a Timestamp field of a CRow as + * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp. + */ +class OutputRowtimeProcessFunction[OUT]( +function: MapFunction[CRow, OUT], +rowtimeIdx: Int) --- End diff -- in fact the data type is changed to keep the data type unchanged. A time indicator is externally represented and treated as a `TIMESTAMP` field and only internally handled as `LONG`. Therefore, we need to convert it into a `TIMESTAMP` once the result is converted into a `DataStream`. You are right, that we need to convert all time indicators to `TIMESTAMP` and not only one. This is currently enforced by the exception that you observed. Currently users have to cast all but one time indicator attributes to `TIMESTAMP`. That will also convert them from `long` to `Timestamp`. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125053#comment-16125053 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132854238 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- I think we should avoid implicit defaults like using the timestamp attribute of the left most table (the left most table might not have a time indicator attribute, join order optimization would change the order of tables) and special cases for queries like `SELECT *`. When a `Table` is converted into a `DataStream` it is likely that the resulting stream is further processed by logic that cannot be expressed in SQL / Table API. If a `Table` has multiple timestamp attributes, IMO a user should be forced to make a choice for the `StreamRecord` timestamp, because the semantics of any subsequent time-based operations will depend on that. I see two ways to do that: - ensure that only one attribute is a time indicator by casting the others to `TIMESTAMP` - let the user specify which field should be used as timestamp as an additional parameter of the `toAppendStream` and `toRetractStream` methods. We could also do both. I agree with @wuchong that we do not need this restriction when we emit a `Table` to a `TableSink`. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124929#comment-16124929 ] ASF GitHub Bot commented on FLINK-7337: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132842811 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException("Field name can not be '*'.") } -(fieldNames.toArray, fieldIndexes.toArray) +(fieldNames.toArray, fieldIndexes.toArray) // build fails if not converted to array --- End diff -- In my local environment, `toArray` also seems to be redundant. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124908#comment-16124908 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132834552 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( +s"Found more than one rowtime field: [${rowtimeFields.map(_.getName).mkString(", ")}] in " + + s"the table that should be converted to a DataStream.\n" + + s"Please select the rowtime field that should be used as event-time timestamp for the " + + s"DataStream by casting all other fields to TIMESTAMP.") +} else if (rowtimeFields.size == 1) { + val origRowType = plan.getType.asInstanceOf[CRowTypeInfo].rowType + val convFieldTypes = origRowType.getFieldTypes.map { t => +if (FlinkTypeFactory.isRowtimeIndicatorType(t)) { --- End diff -- . > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124909#comment-16124909 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132839934 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala --- @@ -16,32 +16,32 @@ * limitations under the License. */ -package org.apache.flink.table.runtime +package org.apache.flink.table.runtime.conversion import java.lang.{Boolean => JBool} import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.configuration.Configuration import org.apache.flink.table.codegen.Compiler import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row -import org.slf4j.LoggerFactory -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.slf4j.{Logger, LoggerFactory} /** - * Convert [[CRow]] to a [[JTuple2]] + * Convert [[CRow]] to a [[JTuple2]]. */ -class CRowInputJavaTupleOutputMapRunner( +class CRowToJavaTupleMapRunner( name: String, code: String, @transient var returnType: TypeInformation[JTuple2[JBool, Any]]) extends RichMapFunction[CRow, Any] with ResultTypeQueryable[JTuple2[JBool, Any]] with Compiler[MapFunction[Row, Any]] { --- End diff -- indent > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124911#comment-16124911 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132841043 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- I'm fine with both current approach and use time indicator of left table as default. But I think no exception should be thrown when writing a Table to a TableSink. But currently, they share the same exception code path. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124910#comment-16124910 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132840719 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException("Field name can not be '*'.") } -(fieldNames.toArray, fieldIndexes.toArray) +(fieldNames.toArray, fieldIndexes.toArray) // build fails if not converted to array --- End diff -- It builds successfully when I remove the `toArray` in my local environment. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124831#comment-16124831 ] ASF GitHub Bot commented on FLINK-7337: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132833668 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.util.Collector + +/** + * Wraps a ProcessFunction and sets a Timestamp field of a CRow as + * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp. + */ +class OutputRowtimeProcessFunction[OUT]( +function: MapFunction[CRow, OUT], +rowtimeIdx: Int) --- End diff -- It seems that this function only changes the data type of the rowtime field from Long to Timestamp. Shall we consider making the `rowtimeIdx` an array? Besides, as @wuchong suggested, I also think a query should keep the data type unchanged. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124639#comment-16124639 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132822158 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- @xccui You just described exactly what this PR does :D We allow multiple rowtimes and store them in the record. Each operator picks what is needed. The exception that you got and the problem that I described, is the final conversion from Table API back to DataStream API. The DataStream API only allows one timestamp. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124631#comment-16124631 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4488 @wuchong could you take a look at #4532 as well? It is based on this PR. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124611#comment-16124611 ] ASF GitHub Bot commented on FLINK-7337: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132820853 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- I got an idea, but not sure if it's applicable. We allow multiple rowtime fields in a stream but only activate one in an operator. Since the timestamps are stored in records, the other inactive rowtime fields can just be taken as common fields. Any changes on the rowtime fields will render them invalid for rowtime use. IMO, there are not too many queries (maybe only over aggregate and join) depending on the rowtime, thus the optimizer may be able to deduce which rowtime field should be activated in an operator. However, some existing logics may be affected by that. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124601#comment-16124601 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r132819122 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import java.sql.Timestamp + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.util.Collector + +/** + * Wraps a ProcessFunction and sets a Timestamp field of a CRow as + * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp. + */ +class WrappingTimestampSetterProcessFunction[OUT]( +function: MapFunction[CRow, OUT], +rowtimeIdx: Int) + extends ProcessFunction[CRow, OUT] { + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +function match { --- End diff -- We can use `FunctionUtils` instead of match case. But I'm also fine with match case. ```scala FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) FunctionUtils.openFunction(function, parameters) ``` > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124600#comment-16124600 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r132818559 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -46,29 +47,29 @@ class StreamTableSourceScan( val fieldNames = TableEnvironment.getFieldNames(tableSource).toList val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList -val fieldCnt = fieldNames.length +val fields = fieldNames.zip(fieldTypes) -val rowtime = tableSource match { +val withRowtime = tableSource match { case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => --- End diff -- For the `DefinedRowtimeAttribute`, we hope the rowtime field can replace an existing field. Just like register a DataStream, the rowtime field can be appended but also can replace an existing field. It is not related to this PR, but we can discuss it at here and to do it in a separate issue. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124599#comment-16124599 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r132817997 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1247,27 +1258,32 @@ abstract class CodeGenerator( } } - private[flink] def generateRecordTimestamp(isEventTime: Boolean): GeneratedExpression = { + private[flink] def generateRowtimeAccess(): GeneratedExpression = { val resultTerm = newName("result") -val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP) +val nullTerm = newName("isNull") -val resultCode = if (isEventTime) { +val accessCode = s""" -|$resultTypeTerm $resultTerm; -|if ($contextTerm.timestamp() == null) { +|Long $resultTerm = $contextTerm.timestamp(); +|if ($resultTerm == null) { | throw new RuntimeException("Rowtime timestamp is null. Please make sure that a proper " + |"TimestampAssigner is defined and the stream environment uses the EventTime time " + |"characteristic."); |} -|else { -| $resultTerm = $contextTerm.timestamp(); -|} -|""".stripMargin -} else { +|boolean $nullTerm = false; + """.stripMargin + +GeneratedExpression(resultTerm, nullTerm, accessCode, TimeIndicatorTypeInfo.ROWTIME_INDICATOR) + } + + private[flink] def generateProctimeTimestamp(): GeneratedExpression = { +val resultTerm = newName("result") +val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + +val resultCode = s""" |$resultTypeTerm $resultTerm = $contextTerm.timerService().currentProcessingTime(); --- End diff -- Why not hardcode the `$resultTypeTerm` as `long` ? The `currentProcessingTime()` always returns `long` primitive type. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124602#comment-16124602 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r132819009 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala --- @@ -19,7 +19,9 @@ package org.apache.flink.table.runtime import java.lang.{Boolean => JBool} +import java.sql.Timestamp +import org.apache.calcite.runtime.SqlFunctions --- End diff -- remove useless imports > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124588#comment-16124588 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132819240 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- Maybe we should treat the `SELECT *` as a special case with no following time indicator. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124586#comment-16124586 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132819174 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- This is a very good question. I don't know how we want to solve the `SELECT *` problem. My initial idea was to just the attribute of the left table as a time indicator. Another idea would be to remove all time indicators and explicitly choose one attribute `SELECT TIME(OrderA.rowtime)`. Still not perfect. We need to discuss this. @fhueske what do you think? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124556#comment-16124556 ] ASF GitHub Bot commented on FLINK-7337: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132817252 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- Thanks for the PR, @fhueske and @twalthr . I tried to rebase my rowtime join codes on this branch, but encountered this exception. The test SQL is `SELECT * FROM OrderA, OrderB WHERE OrderA.productA = OrderB.productB AND OrderB.rtB BETWEEN OrderA.rtA AND OrderA.rtA + INTERVAL '2' SECOND`. What should I do to *cast all other fields to TIMESTAMP*. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124548#comment-16124548 ] ASF GitHub Bot commented on FLINK-7337: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/4532 [FLINK-7337] [table] Refactor internal handling of time indicator attributes and efficiency ## What is the purpose of the change *This PR is an improvement of @fhueske's PR #4488. For an description see #4488. This PR improves the efficiency by not creating objects for every timestamps but Long values and serializing them using the LongSerializer. It also contains code clean-up changes.* You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-7337 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4532.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4532 commit 03ca69505e14aea5452a1b6e77d942ecc2440de4 Author: Fabian HueskeDate: 2017-08-04T00:20:56Z [FLINK-7337] [table] Refactor internal handling of time indicator attributes. - Expand phyiscal Row schema for time indicators. - Refactor computation of logical schema of tables to import. - Refactor operators to use time attribute in Row instead of StreamRecord timestamp. commit 876369d41019b3ec5ba824553b31fb2f3b44a18d Author: Fabian Hueske Date: 2017-08-07T21:39:48Z Addressed review feedback commit 54bfed8debbe29784e0de7f07ebb277df68a4eb5 Author: Fabian Hueske Date: 2017-08-11T14:43:07Z minor improvement commit b0b24011e7c0444e3e1ebaba810edc06e9c85ad6 Author: twalthr Date: 2017-08-12T11:51:42Z Efficient handling of rowtime timestamps > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119770#comment-16119770 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4488 @wuchong No problem. Thursday is fine. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118336#comment-16118336 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4488 @twalthr I'm working on other issues before Thursday, so I would like to have a look at it at Thursday (Beijing). But if you are hurry, I'm fine to merge this first. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118222#comment-16118222 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4488 @wuchong @sunjincheng121 @shaoxuan-wang do you also want to take a look at it? Otherwise I would merge this and work on the followup issue for more efficiency. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118198#comment-16118198 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4488 I'm fine with a followup issue. +1 to merge this. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118114#comment-16118114 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4488 Thanks for the review @twalthr. I addressed your comments and updated the PR. I think it would be very good to handle the timestamps internally as longs. The change seems to be a bit more involved because we need to touch the serialization logic and various type conversion and code generation issues. I'd rather do this as a follow up to this PR. What do you think? Fabian > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117302#comment-16117302 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131764752 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala --- @@ -415,7 +406,13 @@ class RexTimeIndicatorMaterializer( case _ => updatedCall.getOperands.map { o => if (isTimeIndicatorType(o.getType)) { -rexBuilder.makeCall(TimeMaterializationSqlFunction, o) +if (isRowtimeIndicatorType(o.getType)) { --- End diff -- Not sure. The duplicated code is spread over 2 classes and differently parameterized. I tried to add a companion object to `RexTimeIndicatorMaterializer` and the method but this did not really improve the code, IMO. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117253#comment-16117253 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131761767 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala --- @@ -18,42 +18,54 @@ package org.apache.flink.table.runtime -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.table.codegen.Compiler import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row +import org.apache.flink.util.Collector import org.slf4j.LoggerFactory /** - * MapRunner with [[CRow]] output. + * ProcessRunner with [[CRow]] output. */ -class CRowOutputMapRunner( +class CRowOutputProcessRunner( name: String, code: String, @transient var returnType: TypeInformation[CRow]) - extends RichMapFunction[Any, CRow] + extends ProcessFunction[Any, CRow] with ResultTypeQueryable[CRow] - with Compiler[MapFunction[Any, Row]] { + with Compiler[ProcessFunction[Any, Row]] { val LOG = LoggerFactory.getLogger(this.getClass) - private var function: MapFunction[Any, Row] = _ - private var outCRow: CRow = _ + private var function: ProcessFunction[Any, Row] = _ + private var cRowWrapper: CRowWrappingCollector = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating MapFunction.") function = clazz.newInstance() -outCRow = new CRow(null, true) + +this.cRowWrapper = new CRowWrappingCollector() +this.cRowWrapper.setChange(true) } - override def map(in: Any): CRow = { -outCRow.row = function.map(in) -outCRow + override def processElement( + in: Any, + ctx: ProcessFunction[Any, CRow]#Context, + out: Collector[CRow]): Unit = { + +// remove timestamp from stream record +val tc = out.asInstanceOf[TimestampedCollector[_]] --- End diff -- It is not strictly required but reduces the serialization overhead by one Long value. I added this to most functions that introduce a timestamp (ProcessFunction) but would also be OK to remove it. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117176#comment-16117176 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131750828 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -148,6 +142,20 @@ class DataStreamGroupWindowAggregate( "state size. You may specify a retention time of 0 to not clean up the state.") } +val timestampedInput = if (isRowtimeAttribute(window.timeAttribute)) { + // copy the window rowtime attribute into the StreamRecord timestamp field + val timeAttribute = window.timeAttribute.asInstanceOf[ResolvedFieldReference].name + val timeIdx = inputSchema.logicalFieldNames.indexOf(timeAttribute) --- End diff -- Yes, in fact we can only do it like this because logical and physical type are the same. Otherwise, we would need to adjust the index. I'll clean up `RowSchema` a bit more to remove the distinction of logical and physical types. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117119#comment-16117119 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131743809 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -245,14 +244,18 @@ abstract class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = input1Mapping.map { idx => - generateInputAccess(input1, input1Term, idx) +val input1AccessExprs = input1Mapping.map { + case -1 => generateStreamRecordTimestampAcccess() + case -2 => generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR) + case idx => generateInputAccess(input1, input1Term, idx) } val input2AccessExprs = input2 match { case Some(ti) => -input2Mapping.map { idx => - generateInputAccess(ti, input2Term, idx) +input2Mapping.map { --- End diff -- No, should not be used. I'll remove them. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117083#comment-16117083 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131739402 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala --- @@ -172,45 +172,20 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp * * @param fieldNames field names * @param fieldTypes field types, every element is Flink's [[TypeInformation]] -* @param rowtime optional system field to indicate event-time; the index determines the index -*in the final record. If the index is smaller than the number of specified -*fields, it shifts all following fields. -* @param proctime optional system field to indicate processing-time; the index determines the -* index in the final record. If the index is smaller than the number of -* specified fields, it shifts all following fields. * @return a struct type with the input fieldNames, input fieldTypes, and system fields */ def buildLogicalRowType( --- End diff -- I think it's still fine. It creates a row type for the logical (Calcite) plan. But I'm fine changing the name if you have a better one. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117034#comment-16117034 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131706276 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala --- @@ -172,45 +172,20 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp * * @param fieldNames field names * @param fieldTypes field types, every element is Flink's [[TypeInformation]] -* @param rowtime optional system field to indicate event-time; the index determines the index -*in the final record. If the index is smaller than the number of specified -*fields, it shifts all following fields. -* @param proctime optional system field to indicate processing-time; the index determines the -* index in the final record. If the index is smaller than the number of -* specified fields, it shifts all following fields. * @return a struct type with the input fieldNames, input fieldTypes, and system fields */ def buildLogicalRowType( --- End diff -- Is the name of this method still correct? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117032#comment-16117032 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131718272 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProcTimeMaterializationSqlFunction.scala --- @@ -22,12 +22,12 @@ import org.apache.calcite.sql.`type`._ import org.apache.calcite.sql.validate.SqlMonotonicity /** - * Function that materializes a time attribute to the metadata timestamp. After materialization - * the result can be used in regular arithmetical calculations. + * Function that materializes a processing time attribute. + * After materialization the result can be used in regular arithmetical calculations. */ -object TimeMaterializationSqlFunction +object ProcTimeMaterializationSqlFunction --- End diff -- Should we shorten this to `ProctimeSqlFunction`? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117049#comment-16117049 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131720129 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -148,6 +142,20 @@ class DataStreamGroupWindowAggregate( "state size. You may specify a retention time of 0 to not clean up the state.") } +val timestampedInput = if (isRowtimeAttribute(window.timeAttribute)) { + // copy the window rowtime attribute into the StreamRecord timestamp field + val timeAttribute = window.timeAttribute.asInstanceOf[ResolvedFieldReference].name + val timeIdx = inputSchema.logicalFieldNames.indexOf(timeAttribute) + + inputDS +.process( + new TimestampSetterProcessFunction(timeIdx,CRowTypeInfo(inputSchema.physicalTypeInfo))) --- End diff -- missing space > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117043#comment-16117043 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131728686 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala --- @@ -123,6 +124,8 @@ class ProcTimeBoundedRangeOver( return } +out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp() --- End diff -- Do we really need these erasing steps in every function? Shouldn't a rowtime operator overwrite it anyway? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117048#comment-16117048 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131714343 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -937,15 +935,40 @@ object ScalarOperators { } } - def generateConcat( - method: Method, - operands: Seq[GeneratedExpression]): GeneratedExpression = { + def generateConcat(operands: Seq[GeneratedExpression]): GeneratedExpression = { -generateCallIfArgsNotNull(false, STRING_TYPE_INFO, operands) { - (terms) =>s"${qualifyMethod(method)}(${terms.mkString(", ")})" +generateCallIfArgsNotNull(true, STRING_TYPE_INFO, operands) { --- End diff -- the nullability should depend on the code generators nullability > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117047#comment-16117047 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131708789 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -245,14 +244,18 @@ abstract class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = input1Mapping.map { idx => - generateInputAccess(input1, input1Term, idx) +val input1AccessExprs = input1Mapping.map { + case -1 => generateStreamRecordTimestampAcccess() + case -2 => generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR) + case idx => generateInputAccess(input1, input1Term, idx) } val input2AccessExprs = input2 match { case Some(ti) => -input2Mapping.map { idx => - generateInputAccess(ti, input2Term, idx) +input2Mapping.map { --- End diff -- are there any use case where these lines are needed? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117037#comment-16117037 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131724363 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala --- @@ -18,42 +18,54 @@ package org.apache.flink.table.runtime -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.table.codegen.Compiler import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row +import org.apache.flink.util.Collector import org.slf4j.LoggerFactory /** - * MapRunner with [[CRow]] output. + * ProcessRunner with [[CRow]] output. */ -class CRowOutputMapRunner( +class CRowOutputProcessRunner( name: String, code: String, @transient var returnType: TypeInformation[CRow]) - extends RichMapFunction[Any, CRow] + extends ProcessFunction[Any, CRow] with ResultTypeQueryable[CRow] - with Compiler[MapFunction[Any, Row]] { + with Compiler[ProcessFunction[Any, Row]] { val LOG = LoggerFactory.getLogger(this.getClass) - private var function: MapFunction[Any, Row] = _ - private var outCRow: CRow = _ + private var function: ProcessFunction[Any, Row] = _ + private var cRowWrapper: CRowWrappingCollector = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating MapFunction.") --- End diff -- ProcessFunction > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117042#comment-16117042 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131704681 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -502,6 +500,68 @@ abstract class StreamTableEnvironment( } /** +* Injects markers for time indicator fields into the field indexes. +* A rowtime indicator is represented as -1, a proctime indicator as -2. +* +* @param fieldIndexes The field indexes into which the time indicators markers are injected. +* @param rowtime An optional rowtime indicator +* @param proctime An optional proctime indicator +* @return An adjusted array of field indexes. +*/ + private def adjustFieldIndexes( +fieldIndexes: Array[Int], +rowtime: Option[(Int, String)], +proctime: Option[(Int, String)]): Array[Int] = { + +// inject rowtime field +val withRowtime = if (rowtime.isDefined) { --- End diff -- we could use pattern matching here > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117046#comment-16117046 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131709106 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -719,12 +722,11 @@ abstract class CodeGenerator( override def visitCall(call: RexCall): GeneratedExpression = { // special case: time materialization -if (call.getOperator == TimeMaterializationSqlFunction) { - return generateRecordTimestamp( - FlinkTypeFactory.isRowtimeIndicatorType(call.getOperands.get(0).getType) - ) +if (call.getOperator == ProcTimeMaterializationSqlFunction) { + return generateProcTimestamp() } + --- End diff -- remove empty line > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117036#comment-16117036 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131717969 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -937,15 +935,40 @@ object ScalarOperators { } } - def generateConcat( - method: Method, - operands: Seq[GeneratedExpression]): GeneratedExpression = { + def generateConcat(operands: Seq[GeneratedExpression]): GeneratedExpression = { -generateCallIfArgsNotNull(false, STRING_TYPE_INFO, operands) { - (terms) =>s"${qualifyMethod(method)}(${terms.mkString(", ")})" +generateCallIfArgsNotNull(true, STRING_TYPE_INFO, operands) { + (terms) =>s"${qualifyMethod(BuiltInMethods.CONCAT)}(${terms.mkString(", ")})" } } + def generateConcatWs(operands: Seq[GeneratedExpression]): GeneratedExpression = { + +val resultTerm = newName("result") +val nullTerm = newName("isNull") +val defaultValue = primitiveDefaultValue(Types.STRING) + +val operatorCode = + s""" +|${operands.map(_.code).mkString("\n")} +| +|String $resultTerm; +|boolean $nullTerm; +|if (${operands.head.nullTerm}) { +| $nullTerm = true; +| $resultTerm = $defaultValue; +|} else { +| +| ${operands.tail.map(o => s"if (${o.nullTerm}) ${o.resultTerm} = null;").mkString("\n")} --- End diff -- I would not reassign a `resultTerm`. Actually they should be declared `final` for optimization. We should do this in the near future. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117039#comment-16117039 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131726136 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1136,25 +1142,31 @@ object AggregateUtil { } } - private[flink] def computeWindowStartEndPropertyPos( - properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int]) = { + private[flink] def computeWindowPropertyPos( + properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int], Option[Int]) = { -val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) { +val propPos = properties.foldRight( + (None: Option[Int], None: Option[Int], None: Option[Int], 0)) { (p, x) => p match { --- End diff -- Could you name `x` to (a, b, c)? It is hard to read things like `(x._1, Some(x._4), x._3, x._4 - 1)` > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117035#comment-16117035 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131720018 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -148,6 +142,20 @@ class DataStreamGroupWindowAggregate( "state size. You may specify a retention time of 0 to not clean up the state.") } +val timestampedInput = if (isRowtimeAttribute(window.timeAttribute)) { + // copy the window rowtime attribute into the StreamRecord timestamp field + val timeAttribute = window.timeAttribute.asInstanceOf[ResolvedFieldReference].name + val timeIdx = inputSchema.logicalFieldNames.indexOf(timeAttribute) --- End diff -- I thought we can get rid of logical/physical distinction? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117045#comment-16117045 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131724882 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala --- @@ -18,42 +18,54 @@ package org.apache.flink.table.runtime -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.table.codegen.Compiler import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row +import org.apache.flink.util.Collector import org.slf4j.LoggerFactory /** - * MapRunner with [[CRow]] output. + * ProcessRunner with [[CRow]] output. */ -class CRowOutputMapRunner( +class CRowOutputProcessRunner( name: String, code: String, @transient var returnType: TypeInformation[CRow]) - extends RichMapFunction[Any, CRow] + extends ProcessFunction[Any, CRow] with ResultTypeQueryable[CRow] - with Compiler[MapFunction[Any, Row]] { + with Compiler[ProcessFunction[Any, Row]] { val LOG = LoggerFactory.getLogger(this.getClass) - private var function: MapFunction[Any, Row] = _ - private var outCRow: CRow = _ + private var function: ProcessFunction[Any, Row] = _ + private var cRowWrapper: CRowWrappingCollector = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating MapFunction.") function = clazz.newInstance() -outCRow = new CRow(null, true) + +this.cRowWrapper = new CRowWrappingCollector() +this.cRowWrapper.setChange(true) } - override def map(in: Any): CRow = { -outCRow.row = function.map(in) -outCRow + override def processElement( + in: Any, + ctx: ProcessFunction[Any, CRow]#Context, + out: Collector[CRow]): Unit = { + +// remove timestamp from stream record +val tc = out.asInstanceOf[TimestampedCollector[_]] --- End diff -- Do we need this change? It is not executed if `org.apache.flink.table.plan.nodes.CommonScan#needsConversion` returns false. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117044#comment-16117044 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131723377 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala --- @@ -90,3 +95,35 @@ class CRowInputScalaTupleOutputMapRunner( override def getProducedType: TypeInformation[(Boolean, Any)] = returnType } + +/** + * Wraps a ProcessFunction and sets a Timestamp field of a CRow as + * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp. + */ +class WrappingTimestampSetterProcessFunction[OUT]( --- End diff -- Move this to separate file or rename file. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117038#comment-16117038 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131712146 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -245,14 +244,18 @@ abstract class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = input1Mapping.map { idx => - generateInputAccess(input1, input1Term, idx) +val input1AccessExprs = input1Mapping.map { + case -1 => generateStreamRecordTimestampAcccess() + case -2 => generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR) --- End diff -- Can we use constants for this special indices? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117041#comment-16117041 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131724317 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala --- @@ -18,42 +18,54 @@ package org.apache.flink.table.runtime -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.table.codegen.Compiler import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row +import org.apache.flink.util.Collector import org.slf4j.LoggerFactory /** - * MapRunner with [[CRow]] output. + * ProcessRunner with [[CRow]] output. */ -class CRowOutputMapRunner( +class CRowOutputProcessRunner( name: String, code: String, @transient var returnType: TypeInformation[CRow]) - extends RichMapFunction[Any, CRow] + extends ProcessFunction[Any, CRow] with ResultTypeQueryable[CRow] - with Compiler[MapFunction[Any, Row]] { + with Compiler[ProcessFunction[Any, Row]] { val LOG = LoggerFactory.getLogger(this.getClass) - private var function: MapFunction[Any, Row] = _ - private var outCRow: CRow = _ + private var function: ProcessFunction[Any, Row] = _ + private var cRowWrapper: CRowWrappingCollector = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") --- End diff -- ProcessFunction > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117033#comment-16117033 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131704708 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -502,6 +500,68 @@ abstract class StreamTableEnvironment( } /** +* Injects markers for time indicator fields into the field indexes. +* A rowtime indicator is represented as -1, a proctime indicator as -2. +* +* @param fieldIndexes The field indexes into which the time indicators markers are injected. +* @param rowtime An optional rowtime indicator +* @param proctime An optional proctime indicator +* @return An adjusted array of field indexes. +*/ + private def adjustFieldIndexes( +fieldIndexes: Array[Int], +rowtime: Option[(Int, String)], +proctime: Option[(Int, String)]): Array[Int] = { + +// inject rowtime field +val withRowtime = if (rowtime.isDefined) { + fieldIndexes.patch(rowtime.get._1, Seq(-1), 0) // -1 indicates rowtime +} else { + fieldIndexes +} + +// inject proctime field +val withProctime = if (proctime.isDefined) { --- End diff -- we could use pattern matching here > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117027#comment-16117027 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131704170 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -502,6 +500,68 @@ abstract class StreamTableEnvironment( } /** +* Injects markers for time indicator fields into the field indexes. +* A rowtime indicator is represented as -1, a proctime indicator as -2. +* +* @param fieldIndexes The field indexes into which the time indicators markers are injected. +* @param rowtime An optional rowtime indicator +* @param proctime An optional proctime indicator +* @return An adjusted array of field indexes. +*/ + private def adjustFieldIndexes( +fieldIndexes: Array[Int], +rowtime: Option[(Int, String)], +proctime: Option[(Int, String)]): Array[Int] = { + +// inject rowtime field +val withRowtime = if (rowtime.isDefined) { + fieldIndexes.patch(rowtime.get._1, Seq(-1), 0) // -1 indicates rowtime +} else { + fieldIndexes +} + +// inject proctime field +val withProctime = if (proctime.isDefined) { + withRowtime.patch(proctime.get._1, Seq(-2), 0) // -2 indicates proctime +} else { + withRowtime +} + +withProctime + } + + /** +* Injects names of time indicator fields into the list of field names. +* +* @param fieldNames The array of field names into which the time indicator field names are +* injected. +* @param rowtime An optional rowtime indicator +* @param proctime An optional proctime indicator +* @return An adjusted array of field names. +*/ + private def adjustFieldNames( +fieldNames: Array[String], +rowtime: Option[(Int, String)], +proctime: Option[(Int, String)]): Array[String] = { + +// inject rowtime field +val withRowtime = if (rowtime.isDefined) { + fieldNames.patch(rowtime.get._1, Seq(rowtime.get._2), 0) +} else { + fieldNames +} + +// inject proctime field +val withProctime = if (proctime.isDefined) { --- End diff -- we could use pattern matching here > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117025#comment-16117025 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131713099 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1104,6 +1106,19 @@ abstract class CodeGenerator( } } + private[flink] def generateStreamRecordTimestampAcccess(): GeneratedExpression = { +val resultTerm = newName("result") +val nullTerm = newName("isNull") + +val accessCode = + s""" + |long $resultTerm = $contextTerm.timestamp(); --- End diff -- This could lead to a NPE, I would reintroduce the exception from `generateRecordTimestamp`. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117040#comment-16117040 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131708260 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -245,14 +244,18 @@ abstract class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = input1Mapping.map { idx => - generateInputAccess(input1, input1Term, idx) +val input1AccessExprs = input1Mapping.map { --- End diff -- Please add some inline comments here and also in the ScalaDoc of `input1Mapping` and `input2Mapping`. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117031#comment-16117031 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131705390 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -677,12 +748,43 @@ abstract class StreamTableEnvironment( val rootParallelism = plan.getParallelism -conversion match { - case mapFunction: MapFunction[CRow, A] => -plan.map(mapFunction) - .returns(tpe) - .name(s"to: ${tpe.getTypeClass.getSimpleName}") - .setParallelism(rootParallelism) +val rowtimeFields = logicalType.getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +if (rowtimeFields.isEmpty) { + // to rowtime field to set + + conversion match { +case mapFunction: MapFunction[CRow, A] => + plan.map(mapFunction) +.returns(tpe) +.name(s"to: ${tpe.getTypeClass.getSimpleName}") +.setParallelism(rootParallelism) + } +} else if (rowtimeFields.size == 1) { + // set the only rowtime field as event-time timestamp for DataStream + + val mapFunction = conversion match { +case mapFunction: MapFunction[CRow, A] => mapFunction +case _ => new MapFunction[CRow, A] { + override def map(cRow: CRow): A = cRow.asInstanceOf[A] +} + } + + plan.process( +new WrappingTimestampSetterProcessFunction[A]( + mapFunction, + rowtimeFields.head.getIndex)) +.returns(tpe) +.name(s"to: ${tpe.getTypeClass.getSimpleName}") +.setParallelism(rootParallelism) + +} else { + throw new TableException( +s"Found more than one rowtime field: [${rowtimeFields.map(_.getName).mkString(", ")}] in " + + s"the table that should be converted to a DataStream.\n" + + s"Please select the rowtime field that should be used as event-time timestamp for the " + + s"DataStream by casting all other fields to TIMESTAMP or LONG.") --- End diff -- I would just recommend TIMESTAMP. LONG is still not supported. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117026#comment-16117026 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131704138 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -502,6 +500,68 @@ abstract class StreamTableEnvironment( } /** +* Injects markers for time indicator fields into the field indexes. +* A rowtime indicator is represented as -1, a proctime indicator as -2. +* +* @param fieldIndexes The field indexes into which the time indicators markers are injected. +* @param rowtime An optional rowtime indicator +* @param proctime An optional proctime indicator +* @return An adjusted array of field indexes. +*/ + private def adjustFieldIndexes( +fieldIndexes: Array[Int], +rowtime: Option[(Int, String)], +proctime: Option[(Int, String)]): Array[Int] = { + +// inject rowtime field +val withRowtime = if (rowtime.isDefined) { + fieldIndexes.patch(rowtime.get._1, Seq(-1), 0) // -1 indicates rowtime +} else { + fieldIndexes +} + +// inject proctime field +val withProctime = if (proctime.isDefined) { + withRowtime.patch(proctime.get._1, Seq(-2), 0) // -2 indicates proctime +} else { + withRowtime +} + +withProctime + } + + /** +* Injects names of time indicator fields into the list of field names. +* +* @param fieldNames The array of field names into which the time indicator field names are +* injected. +* @param rowtime An optional rowtime indicator +* @param proctime An optional proctime indicator +* @return An adjusted array of field names. +*/ + private def adjustFieldNames( +fieldNames: Array[String], +rowtime: Option[(Int, String)], +proctime: Option[(Int, String)]): Array[String] = { + +// inject rowtime field +val withRowtime = if (rowtime.isDefined) { --- End diff -- we could use pattern matching here > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117029#comment-16117029 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131703254 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -677,12 +748,43 @@ abstract class StreamTableEnvironment( val rootParallelism = plan.getParallelism -conversion match { - case mapFunction: MapFunction[CRow, A] => -plan.map(mapFunction) - .returns(tpe) - .name(s"to: ${tpe.getTypeClass.getSimpleName}") - .setParallelism(rootParallelism) +val rowtimeFields = logicalType.getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +if (rowtimeFields.isEmpty) { + // to rowtime field to set --- End diff -- typo? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117030#comment-16117030 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131706849 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala --- @@ -311,19 +323,19 @@ object RelTimeIndicatorConverter { var needsConversion = false -// materialize all remaining time indicators +// materialize remaining proc time indicators --- End diff -- remove space > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117024#comment-16117024 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131707428 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala --- @@ -415,7 +406,13 @@ class RexTimeIndicatorMaterializer( case _ => updatedCall.getOperands.map { o => if (isTimeIndicatorType(o.getType)) { -rexBuilder.makeCall(TimeMaterializationSqlFunction, o) +if (isRowtimeIndicatorType(o.getType)) { --- End diff -- maybe we could move this code to a method, because it appears 3 times > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117028#comment-16117028 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131709842 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1104,6 +1106,19 @@ abstract class CodeGenerator( } } + private[flink] def generateStreamRecordTimestampAcccess(): GeneratedExpression = { --- End diff -- I would call this `generateRowtimeAccess` and locate it next to `generateProctimeTimestamp`. In any case remove the third `c`. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116520#comment-16116520 ] ASF GitHub Bot commented on FLINK-7337: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/4488 [FLINK-7337] [table] Refactor internal handling of time indicator attributes. ## What is the purpose of the change Handle time indicators attributes as physical fields in `Row`. - support for multiple event-time time attributes (required for proper join handling) - no distinction of logical and physical schema ## Brief change log - Expand phyiscal Row schema for time indicators. - Refactor computation of logical schema of tables to import. - `StreamRecord` timestamps are moved into `Row` during initial conversion - Refactor operators to use time attribute in Row instead of StreamRecord timestamp. - timestamps are copied into `StreamRecord` when `Table` is converted into `DataStream`. - Drive-by fix for NPE in `generateInputFieldUnboxing` ## Verifying this change - Changes are mostly internal. - No new features or public APIs have been added. - All existing tests pass. - Tests have been added to verify that time indicators are copied when `Table` is converted into `DataStream`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **yes** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **na** You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableTimeIndi Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4488.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4488 commit accf80587b5a7455fb913b5860f973b3a738765a Author: Fabian HueskeDate: 2017-08-04T00:20:56Z [FLINK-7337] [table] Refactor internal handling of time indicator attributes. - Expand phyiscal Row schema for time indicators. - Refactor computation of logical schema of tables to import. - Refactor operators to use time attribute in Row instead of StreamRecord timestamp. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16115247#comment-16115247 ] Xingcan Cui commented on FLINK-7337: Thanks for the explanation, [~fhueske]. Everything's clear to me now. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114891#comment-16114891 ] Fabian Hueske commented on FLINK-7337: -- Hi [~xccui], no worries about your questions. They are all very valid. 1) The {{ProcessFunction}} copies the {{StreamRecord}} timestamp (this is the one set with the assigner) into the {{Row}} and removes the {{StreamRecord}} timestamp. So it's an exact copy. The watermarks are not affected by this. Watermarks are special records and not directly exposed to {{ProcessFunction}}. 2) We use the watermarks to trigger computations when the result is assumed to be complete. They are a mechanism to control out-of-order data. So, yes. We could compute results without watermarks, but then we would have to sent many updates because we do not know at which point in time we received enough data for a good first result. This is the approach of Kafka Streams. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114196#comment-16114196 ] Xingcan Cui commented on FLINK-7337: Hi [~fhueske], sorry for the silly questions. In my mind, to convert a DataStream to a Table, the timestamps and watermarks must be assigned in advance with an assigner. Then (1) what's the relationship between the timestamps assigned before and those set by the new {{ProcessFunction}}, and (2) could the timestamps dynamically set to the {{StreamRecord}} work without the watermarks? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112054#comment-16112054 ] Jark Wu commented on FLINK-7337: +1 for the approach. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111066#comment-16111066 ] Timo Walther commented on FLINK-7337: - That makes sense. +1 for this change. I also didn't like the index mapping solution to be honest. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111030#comment-16111030 ] Fabian Hueske commented on FLINK-7337: -- We would of course still have a logical and a physical schema. The logical schema is Calcite's {{RelDataType}} and the physical schema is based on Flink's {{TypeInformation}}. However, both would be have the same schema and we do not need a special translation that "filters" out time indicator fields and keeps track of their positions. Hence, we might be able to remove the {{RowSchema}} class (or keep it but significantly simplify the logic). Time indicator fields would be defined and identified as before. So we would still use {{.rowtime}} and {{.proctime}} when converting a {{DataStream}} into a {{Table}} and the equivalent interfaces for {{TableSources}}. Fields which are declared as {{rowtime}} (or {{proctime}}) will have a special time indicator type (as right now). Any expressions that is evaluated on such a field results in a regular timestamp type, i.e., the resulting value cannot be used as a time indicator. AFAIK, the current approach uses similar checks. So essentially, many things (including the API) remain as they are. We are "only" changing the internal representation of timestamp attributes. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110994#comment-16110994 ] Timo Walther commented on FLINK-7337: - In general, I like your approach (esp. the 1 bit overhead is a nice solution). The only question I have is, if we remove the separation of logical and physical row type, how do we track if a field is a time indicator? This has to be defined either in the field data type or the record data type. We cannot allow any Long field as a timestamp. How do we handle operations like {{rowtime - 100}}? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110590#comment-16110590 ] Fabian Hueske commented on FLINK-7337: -- There is no "official" public API for that, but one can cast the {{Collector}} parameter of the {{ProcessFunction.processElement()}} method into a {{TimestampedCollector}} can set the timestamp of emitted records with {{TimestampedCollector.setAbsoluteTimestamp()}}. That's how it is done for example in {{RowTimeUnboundedOver}}. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110120#comment-16110120 ] Jark Wu commented on FLINK-7337: A question: How to set timestamp into StreamRecord using {{ProcessFunction}} ? I didn't find {{ProcessFunction}} can handle timestamp field of StreamRecord or collect a record with timestamp. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16109161#comment-16109161 ] Fabian Hueske commented on FLINK-7337: -- This change won't affect the handling of watermarks. It will only move the timestamp into the Row and refactor the internal handling of time indicators. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16109139#comment-16109139 ] Xingcan Cui commented on FLINK-7337: Hi [~fhueske], thanks for the JIRA. I just wonder if the watermarks will be set automatically along with timestamps. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)