[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16255301#comment-16255301
 ] 

ASF GitHub Bot commented on FLINK-8069:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5016


> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16255018#comment-16255018
 ] 

ASF GitHub Bot commented on FLINK-8069:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5016
  
The PR looks good.
I'll merge it.


> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16254957#comment-16254957
 ] 

ASF GitHub Bot commented on FLINK-8069:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5016
  
Hi @xccui, thanks for the update. Regarding your questions:

1. I think `null` should not be supported.
2. Such a `TimestampExtractor` is already available: 
[`StreamRecordTimestamp`](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala)
 :-)



> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16254913#comment-16254913
 ] 

ASF GitHub Bot commented on FLINK-8069:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5016
  
Hi @fhueske, the PR has been updated. During the refactoring, I got two 
questions. 

1. Do you think it's still necessary to allow the watermark strategy to be 
`null`?
2. Currently, we only provide an `ExistingField` extractor for the rowtime 
field. Considering that the `TableSource` may be created from an existing 
`DataStream`, I think we should also provide a corresponding extractor, which 
automatically reads the timestamp from the `StreamRecord` and set it as a 
field. What do you think?


> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253370#comment-16253370
 ] 

ASF GitHub Bot commented on FLINK-8069:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5016
  
Thanks for the review @fhueske. I think you are right. A dedicated 
`WatermarkStrategy` with watermark-preserve semantics  should be better. I'll 
update the PR according to your comments. Thanks.


> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253318#comment-16253318
 ] 

ASF GitHub Bot commented on FLINK-8069:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5016#discussion_r151100405
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
 ---
@@ -154,6 +154,40 @@ class TableSourceITCase extends 
StreamingMultipleProgramsTestBase {
   }
 
   @Test
+  def testRowtimeTableSourceWithoutWMStrategy(): Unit = {
--- End diff --

I'd be fine with either way, but the test should ensure that watermarks are 
preserved.


> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253317#comment-16253317
 ] 

ASF GitHub Bot commented on FLINK-8069:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5016#discussion_r151096563
  
--- Diff: docs/dev/table/sourceSinks.md ---
@@ -316,7 +316,7 @@ A table schema field of type `SQL_TIMESTAMP` can be 
declared as rowtime attribut
 
 * the name of the field, 
 * a `TimestampExtractor` that computes the actual value for the attribute 
(usually from one or more other attributes), and
-* a `WatermarkStrategy` that specifies how watermarks are generated for 
the the rowtime attribute.
+* a `WatermarkStrategy` that specifies how watermarks are generated for 
the the rowtime attribute (With the default value `null` to indicate no 
watermark should be generated).
--- End diff --

I would leave this part of the docs unchanged


> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253314#comment-16253314
 ] 

ASF GitHub Bot commented on FLINK-8069:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5016#discussion_r151096958
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
 ---
@@ -62,12 +62,13 @@ trait DefinedRowtimeAttributes {
   *
   * @param attributeName The name of the rowtime attribute.
   * @param timestampExtractor The timestamp extractor to derive the values 
of the attribute.
-  * @param watermarkStrategy The watermark strategy associated with the 
attribute.
+  * @param watermarkStrategy The watermark strategy associated with the 
attribute. With the default
--- End diff --

revert changes in this class


> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253315#comment-16253315
 ] 

ASF GitHub Bot commented on FLINK-8069:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5016#discussion_r151095005
  
--- Diff: docs/dev/table/sourceSinks.md ---
@@ -316,7 +316,7 @@ A table schema field of type `SQL_TIMESTAMP` can be 
declared as rowtime attribut
 
 * the name of the field, 
 * a `TimestampExtractor` that computes the actual value for the attribute 
(usually from one or more other attributes), and
-* a `WatermarkStrategy` that specifies how watermarks are generated for 
the the rowtime attribute.
+* a `WatermarkStrategy` that specifies how watermarks are generated for 
the the rowtime attribute (With the default value `null` to indicate no 
watermark should be generated).
--- End diff --

I think we should not use `null` to indicate that no watermarks should be 
assigned. 
Rather provide a `WatermarkStrategy` that indicates that existing 
watermarks are preserved like:

```
class PreserveWatermarks extends WatermarkStrategy {
  def INSTANCE: PreserveWatermarks = new PreserveWatermarks
}
```


> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253316#comment-16253316
 ] 

ASF GitHub Bot commented on FLINK-8069:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5016#discussion_r151099552
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
 ---
@@ -154,6 +154,40 @@ class TableSourceITCase extends 
StreamingMultipleProgramsTestBase {
   }
 
   @Test
+  def testRowtimeTableSourceWithoutWMStrategy(): Unit = {
--- End diff --

The test does not really check that watermarks are preserved.

We would need a `TableSource` with a non-parallel `SourceFunction` that 
emits records and watermarks, similar to 
`TimeTestUtil.EventTimeSourceFunction`. 

There are two ways to ensure that the watermarks have been preserved:
1. easy / not precise: we emit late data from the TableSource and validated 
that the late data is not included in the result
2. more complex / precise: We run a simple select project query on the 
table and convert the query result into an append `DataStream`.  Finally, we 
need to check that all watermarks have been preserved using a custom operator 
(other functions do not have access to watermarks).


> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252931#comment-16252931
 ] 

ASF GitHub Bot commented on FLINK-8069:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5016

[FLINK-8069] [table] Support empty watermark strategy for TableSources

## What is the purpose of the change

This PR enables an empty watermark strategy for 
`RowtimeAttributeDescriptor`.

## Brief change log

  - Add a default `null` value for the watermark strategy in 
`RowtimeAttributeDescriptor`.
  - Add a case in `StreamTableSourceScan` for empty watermark strategy.
  - Add a test case and update related docs.

## Verifying this change

The change can be verified by the new added 
`testRowtimeTableSourceWithoutWMStrategy()`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8069

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5016.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5016


commit ba0fc2777f2c81c582032124d301f5ee301ae009
Author: Xingcan Cui 
Date:   2017-11-15T03:01:13Z

[FLINK-8069] [table] Support empty watermark strategy for TableSources




> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-14 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252794#comment-16252794
 ] 

Xingcan Cui commented on FLINK-8069:


Thanks for the explanation [~twalthr] 

> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-14 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251475#comment-16251475
 ] 

Timo Walther commented on FLINK-8069:
-

Given you have a TableSource like this: 
https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/table_java/sources/TaxiRideTableSource.java
And the custom TaxiRideSource already emits watermarks and assigns timestamps.

We need a to define a {{RowtimeAttributeDescriptor("eventTime", new 
StreamRecordTimestamp(), WatermarkStrategy???)}}  when implementing 
{{DefinedRowtimeAttributes}}. So an explicit empty watermark strategy that does 
basically nothing, is needed in this use case.

> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-14 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251458#comment-16251458
 ] 

Xingcan Cui commented on FLINK-8069:


Hi [~twalthr], could you give a little more explanations about the use case? 
That will be helpful. Thanks.

> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-14 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251320#comment-16251320
 ] 

Timo Walther commented on FLINK-8069:
-

Hi [~xccui], we just had a use case where we are forced to supply some strategy 
but actually our data source emits watermarks. We could either make it explicit 
by a new {{StreamRecordWatermark}} class similar to {{StreamRecordTimestamp}} 
or just allow passing of {{null}}.

> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8069) Support empty watermark strategy for TableSources

2017-11-14 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251300#comment-16251300
 ] 

Xingcan Cui commented on FLINK-8069:


Hi [~twalthr], thanks for raising this. Since the 
{{RowtimeAttributeDescriptor}} is only used in the {{TableSource}}, which reads 
data from external systems, do you think it's necessary to provide an empty 
watermark strategy for that?

> Support empty watermark strategy for TableSources
> -
>
> Key: FLINK-8069
> URL: https://issues.apache.org/jira/browse/FLINK-8069
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> In case the underlying data stream source emits watermarks, it should be 
> possible to define an empty watermark strategy for rowtime attributes in the 
> {{RowtimeAttributeDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)