[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16255301#comment-16255301 ] ASF GitHub Bot commented on FLINK-8069: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5016 > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16255018#comment-16255018 ] ASF GitHub Bot commented on FLINK-8069: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5016 The PR looks good. I'll merge it. > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16254957#comment-16254957 ] ASF GitHub Bot commented on FLINK-8069: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5016 Hi @xccui, thanks for the update. Regarding your questions: 1. I think `null` should not be supported. 2. Such a `TimestampExtractor` is already available: [`StreamRecordTimestamp`](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala) :-) > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16254913#comment-16254913 ] ASF GitHub Bot commented on FLINK-8069: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5016 Hi @fhueske, the PR has been updated. During the refactoring, I got two questions. 1. Do you think it's still necessary to allow the watermark strategy to be `null`? 2. Currently, we only provide an `ExistingField` extractor for the rowtime field. Considering that the `TableSource` may be created from an existing `DataStream`, I think we should also provide a corresponding extractor, which automatically reads the timestamp from the `StreamRecord` and set it as a field. What do you think? > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253370#comment-16253370 ] ASF GitHub Bot commented on FLINK-8069: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5016 Thanks for the review @fhueske. I think you are right. A dedicated `WatermarkStrategy` with watermark-preserve semantics should be better. I'll update the PR according to your comments. Thanks. > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253318#comment-16253318 ] ASF GitHub Bot commented on FLINK-8069: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5016#discussion_r151100405 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala --- @@ -154,6 +154,40 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { } @Test + def testRowtimeTableSourceWithoutWMStrategy(): Unit = { --- End diff -- I'd be fine with either way, but the test should ensure that watermarks are preserved. > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253317#comment-16253317 ] ASF GitHub Bot commented on FLINK-8069: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5016#discussion_r151096563 --- Diff: docs/dev/table/sourceSinks.md --- @@ -316,7 +316,7 @@ A table schema field of type `SQL_TIMESTAMP` can be declared as rowtime attribut * the name of the field, * a `TimestampExtractor` that computes the actual value for the attribute (usually from one or more other attributes), and -* a `WatermarkStrategy` that specifies how watermarks are generated for the the rowtime attribute. +* a `WatermarkStrategy` that specifies how watermarks are generated for the the rowtime attribute (With the default value `null` to indicate no watermark should be generated). --- End diff -- I would leave this part of the docs unchanged > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253314#comment-16253314 ] ASF GitHub Bot commented on FLINK-8069: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5016#discussion_r151096958 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala --- @@ -62,12 +62,13 @@ trait DefinedRowtimeAttributes { * * @param attributeName The name of the rowtime attribute. * @param timestampExtractor The timestamp extractor to derive the values of the attribute. - * @param watermarkStrategy The watermark strategy associated with the attribute. + * @param watermarkStrategy The watermark strategy associated with the attribute. With the default --- End diff -- revert changes in this class > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253315#comment-16253315 ] ASF GitHub Bot commented on FLINK-8069: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5016#discussion_r151095005 --- Diff: docs/dev/table/sourceSinks.md --- @@ -316,7 +316,7 @@ A table schema field of type `SQL_TIMESTAMP` can be declared as rowtime attribut * the name of the field, * a `TimestampExtractor` that computes the actual value for the attribute (usually from one or more other attributes), and -* a `WatermarkStrategy` that specifies how watermarks are generated for the the rowtime attribute. +* a `WatermarkStrategy` that specifies how watermarks are generated for the the rowtime attribute (With the default value `null` to indicate no watermark should be generated). --- End diff -- I think we should not use `null` to indicate that no watermarks should be assigned. Rather provide a `WatermarkStrategy` that indicates that existing watermarks are preserved like: ``` class PreserveWatermarks extends WatermarkStrategy { def INSTANCE: PreserveWatermarks = new PreserveWatermarks } ``` > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253316#comment-16253316 ] ASF GitHub Bot commented on FLINK-8069: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5016#discussion_r151099552 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala --- @@ -154,6 +154,40 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { } @Test + def testRowtimeTableSourceWithoutWMStrategy(): Unit = { --- End diff -- The test does not really check that watermarks are preserved. We would need a `TableSource` with a non-parallel `SourceFunction` that emits records and watermarks, similar to `TimeTestUtil.EventTimeSourceFunction`. There are two ways to ensure that the watermarks have been preserved: 1. easy / not precise: we emit late data from the TableSource and validated that the late data is not included in the result 2. more complex / precise: We run a simple select project query on the table and convert the query result into an append `DataStream`. Finally, we need to check that all watermarks have been preserved using a custom operator (other functions do not have access to watermarks). > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252931#comment-16252931 ] ASF GitHub Bot commented on FLINK-8069: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5016 [FLINK-8069] [table] Support empty watermark strategy for TableSources ## What is the purpose of the change This PR enables an empty watermark strategy for `RowtimeAttributeDescriptor`. ## Brief change log - Add a default `null` value for the watermark strategy in `RowtimeAttributeDescriptor`. - Add a case in `StreamTableSourceScan` for empty watermark strategy. - Add a test case and update related docs. ## Verifying this change The change can be verified by the new added `testRowtimeTableSourceWithoutWMStrategy()`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8069 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5016.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5016 commit ba0fc2777f2c81c582032124d301f5ee301ae009 Author: Xingcan CuiDate: 2017-11-15T03:01:13Z [FLINK-8069] [table] Support empty watermark strategy for TableSources > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252794#comment-16252794 ] Xingcan Cui commented on FLINK-8069: Thanks for the explanation [~twalthr] > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251475#comment-16251475 ] Timo Walther commented on FLINK-8069: - Given you have a TableSource like this: https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/table_java/sources/TaxiRideTableSource.java And the custom TaxiRideSource already emits watermarks and assigns timestamps. We need a to define a {{RowtimeAttributeDescriptor("eventTime", new StreamRecordTimestamp(), WatermarkStrategy???)}} when implementing {{DefinedRowtimeAttributes}}. So an explicit empty watermark strategy that does basically nothing, is needed in this use case. > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251458#comment-16251458 ] Xingcan Cui commented on FLINK-8069: Hi [~twalthr], could you give a little more explanations about the use case? That will be helpful. Thanks. > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251320#comment-16251320 ] Timo Walther commented on FLINK-8069: - Hi [~xccui], we just had a use case where we are forced to supply some strategy but actually our data source emits watermarks. We could either make it explicit by a new {{StreamRecordWatermark}} class similar to {{StreamRecordTimestamp}} or just allow passing of {{null}}. > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources
[ https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251300#comment-16251300 ] Xingcan Cui commented on FLINK-8069: Hi [~twalthr], thanks for raising this. Since the {{RowtimeAttributeDescriptor}} is only used in the {{TableSource}}, which reads data from external systems, do you think it's necessary to provide an empty watermark strategy for that? > Support empty watermark strategy for TableSources > - > > Key: FLINK-8069 > URL: https://issues.apache.org/jira/browse/FLINK-8069 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > In case the underlying data stream source emits watermarks, it should be > possible to define an empty watermark strategy for rowtime attributes in the > {{RowtimeAttributeDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)