[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 Thank you for your suggestion. It sounds good and will be more friendly to users than throwing exception in `FlinkKafkaConsumerBase`. I'll fix it soon, thanks :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015242#comment-16015242 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 Thank you for your suggestion. It sounds good and will be more friendly to users than throwing exception in `FlinkKafkaConsumerBase`. I'll fix it soon, thanks :) > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015237#comment-16015237 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3915 @zjureel For Kafka 0.11, I would expect it to just extend `FlinkKafkaConsumer010`. As you can see, that is also the case right now for 010; its extending `FlinkKafkaConsumer09` and not the base class. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3915 @zjureel For Kafka 0.11, I would expect it to just extend `FlinkKafkaConsumer010`. As you can see, that is also the case right now for 010; its extending `FlinkKafkaConsumer09` and not the base class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6221) Add Prometheus support to metrics
[ https://issues.apache.org/jira/browse/FLINK-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015224#comment-16015224 ] ASF GitHub Bot commented on FLINK-6221: --- Github user mbode commented on the issue: https://github.com/apache/flink/pull/3833 @zentol Would you mind checking that I got the shading right? > Add Prometheus support to metrics > - > > Key: FLINK-6221 > URL: https://issues.apache.org/jira/browse/FLINK-6221 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0 >Reporter: Joshua Griffith >Assignee: Maximilian Bode >Priority: Minor > > [Prometheus|https://prometheus.io/] is becoming popular for metrics and > alerting. It's possible to use > [statsd-exporter|https://github.com/prometheus/statsd_exporter] to load Flink > metrics into Prometheus but it would be far easier if Flink supported > Promethus as a metrics reporter. A [dropwizard > client|https://github.com/prometheus/client_java/tree/master/simpleclient_dropwizard] > exists that could be integrated into the existing metrics system. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter
Github user mbode commented on the issue: https://github.com/apache/flink/pull/3833 @zentol Would you mind checking that I got the shading right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6620) Add KeyGroupCheckpointedOperator interface that works for checkpointing key-groups
Jingsong Lee created FLINK-6620: --- Summary: Add KeyGroupCheckpointedOperator interface that works for checkpointing key-groups Key: FLINK-6620 URL: https://issues.apache.org/jira/browse/FLINK-6620 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Jingsong Lee Priority: Minor [~aljoscha] We have discussed it on: https://issues.apache.org/jira/browse/BEAM-1393 {code} /** * This interface is used to checkpoint key-groups state. */ public interface KeyGroupCheckpointedOperator extends KeyGroupRestoringOperator{ /** * Snapshots the state for a given {@code keyGroupIdx}. * * AbstractStreamOperator would call this hook in * AbstractStreamOperator.snapshotState() while iterating over the key groups. * @param keyGroupIndex the id of the key-group to be put in the snapshot. * @param out the stream to write to. */ void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception; } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015217#comment-16015217 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117162289 --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml --- @@ -37,7 +37,7 @@ under the License. - 0.10.0.1 + 0.10.1.0 --- End diff -- The dependency tree of 0.10.0.1 and 0.10.1.0 is the same when I use mvn dependency:tree to print the dependency information: +- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile | +- net.jpountz.lz4:lz4:jar:1.3.0:compile | \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile +- org.apache.kafka:kafka-clients:jar:0.10.1.0:compile | +- net.jpountz.lz4:lz4:jar:1.3.0:compile | \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user zjureel commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117162289 --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml --- @@ -37,7 +37,7 @@ under the License. - 0.10.0.1 + 0.10.1.0 --- End diff -- The dependency tree of 0.10.0.1 and 0.10.1.0 is the same when I use mvn dependency:tree to print the dependency information: +- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile | +- net.jpountz.lz4:lz4:jar:1.3.0:compile | \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile +- org.apache.kafka:kafka-clients:jar:0.10.1.0:compile | +- net.jpountz.lz4:lz4:jar:1.3.0:compile | \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015216#comment-16015216 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 @tzulitai Glad to hear from you. In fact I'm also entangled with whether to put the `setStartFromSpecificDate` method into `FlinkKafkaConsumerBase`, and I put it into `FlinkKafkaComsumerBase` finally for two reasons: 1. All the other methods that set the Kafka start offset are in `FlinkKafkaConsumerBase`, to keep it aligned, I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase` 2. For subsequent versions of Kafka, such as version 0.11, this feature should be available also, but it may need to extend from the `FlinkKafkaConsumerBase` directly. I think this method will be used in multiple implements, so I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase` > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 @tzulitai Glad to hear from you. In fact I'm also entangled with whether to put the `setStartFromSpecificDate` method into `FlinkKafkaConsumerBase`, and I put it into `FlinkKafkaComsumerBase` finally for two reasons: 1. All the other methods that set the Kafka start offset are in `FlinkKafkaConsumerBase`, to keep it aligned, I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase` 2. For subsequent versions of Kafka, such as version 0.11, this feature should be available also, but it may need to extend from the `FlinkKafkaConsumerBase` directly. I think this method will be used in multiple implements, so I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6618) Fix GroupWindowStringExpressionTest testcase bug
[ https://issues.apache.org/jira/browse/FLINK-6618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015213#comment-16015213 ] ASF GitHub Bot commented on FLINK-6618: --- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3936 [FLINK-6618][table] Fix GroupWindowStringExpressionTest test case bug In this PR. I had fix the `GroupWindowStringExpressionTest` test case bug - [x] General - The pull request references the related JIRA issue ("[FLINK-6618][table] Fix GroupWindowStringExpressionTest testcase bug") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6618-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3936.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3936 commit f331798000bbf8ce6316862545c761583e8b9eef Author: sunjincheng121Date: 2017-05-18T05:02:24Z [FLINK-6618][table] Fix GroupWindowStringExpressionTest testcase bug > Fix GroupWindowStringExpressionTest testcase bug > > > Key: FLINK-6618 > URL: https://issues.apache.org/jira/browse/FLINK-6618 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > I find 2 bugs as follows: > 1. {{GroupWindowStringExpressionTest}} testcase bug: > {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, > resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", > resJava.logicalPlan, resScala.logicalPlan)}} > 2. When i fix the bug above, we got anther bug: > {code} > java.lang.AssertionError: Logical Plans do not match > Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as > '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, > 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) > as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, > WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, > 'long, > 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER > int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) > Actual :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as > '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, > 1440.millis, > 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, > sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, > WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, > 'int, 'long, > 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER > int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3936: [FLINK-6618][table] Fix GroupWindowStringExpressio...
GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3936 [FLINK-6618][table] Fix GroupWindowStringExpressionTest test case bug In this PR. I had fix the `GroupWindowStringExpressionTest` test case bug - [x] General - The pull request references the related JIRA issue ("[FLINK-6618][table] Fix GroupWindowStringExpressionTest testcase bug") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6618-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3936.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3936 commit f331798000bbf8ce6316862545c761583e8b9eef Author: sunjincheng121Date: 2017-05-18T05:02:24Z [FLINK-6618][table] Fix GroupWindowStringExpressionTest testcase bug --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6618) Fix GroupWindowStringExpressionTest testcase bug
[ https://issues.apache.org/jira/browse/FLINK-6618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6618: --- Summary: Fix GroupWindowStringExpressionTest testcase bug (was: Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.) > Fix GroupWindowStringExpressionTest testcase bug > > > Key: FLINK-6618 > URL: https://issues.apache.org/jira/browse/FLINK-6618 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > I find 2 bugs as follows: > 1. {{GroupWindowStringExpressionTest}} testcase bug: > {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, > resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", > resJava.logicalPlan, resScala.logicalPlan)}} > 2. When i fix the bug above, we got anther bug: > {code} > java.lang.AssertionError: Logical Plans do not match > Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as > '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, > 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) > as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, > WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, > 'long, > 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER > int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) > Actual :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as > '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, > 1440.millis, > 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, > sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, > WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, > 'int, 'long, > 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER > int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6495) Migrate Akka configuration options
[ https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015191#comment-16015191 ] ASF GitHub Bot commented on FLINK-6495: --- GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/3935 [FLINK-6495] Migrate Akka configuration options Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6495 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3935.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3935 commit c87718694052e499875d78c7ef2bc9573dc0cc4e Author: zjureelDate: 2017-05-18T04:34:40Z [FLINK-6495] Migrate Akka configuration options > Migrate Akka configuration options > -- > > Key: FLINK-6495 > URL: https://issues.apache.org/jira/browse/FLINK-6495 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Reporter: Chesnay Schepler >Assignee: Fang Yong > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3935: [FLINK-6495] Migrate Akka configuration options
GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/3935 [FLINK-6495] Migrate Akka configuration options Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6495 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3935.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3935 commit c87718694052e499875d78c7ef2bc9573dc0cc4e Author: zjureelDate: 2017-05-18T04:34:40Z [FLINK-6495] Migrate Akka configuration options --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6618) Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.
[ https://issues.apache.org/jira/browse/FLINK-6618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6618: --- Issue Type: Sub-task (was: Bug) Parent: FLINK-6619 > Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans. > - > > Key: FLINK-6618 > URL: https://issues.apache.org/jira/browse/FLINK-6618 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > I find 2 bugs as follows: > 1. {{GroupWindowStringExpressionTest}} testcase bug: > {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, > resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", > resJava.logicalPlan, resScala.logicalPlan)}} > 2. When i fix the bug above, we got anther bug: > {code} > java.lang.AssertionError: Logical Plans do not match > Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as > '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, > 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) > as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, > WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, > 'long, > 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER > int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) > Actual :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as > '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, > 1440.millis, > 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, > sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, > WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, > 'int, 'long, > 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER > int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6619) Check Table API & SQL support for 1.3.0 RC01 Release
sunjincheng created FLINK-6619: -- Summary: Check Table API & SQL support for 1.3.0 RC01 Release Key: FLINK-6619 URL: https://issues.apache.org/jira/browse/FLINK-6619 Project: Flink Issue Type: Test Components: Table API & SQL Affects Versions: 1.3.0 Reporter: sunjincheng Assignee: sunjincheng In this JIRA. I will do the following tasks for Flink 1.3.0 RC01 Release. * Check that the JAVA and SCALA logical plans are consistent. * Check that the SQL and Table API logical plans are consistent. * Check that UDF, UDTF, and UDAF are working properly in group-windows and over-windows. * Check that all built-in Agg on Batch and Stream are working properly. When I do the task above, I'll created some sub-task. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6617) Improve JAVA and SCALA logical plans consistent test
[ https://issues.apache.org/jira/browse/FLINK-6617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6617: --- Issue Type: Sub-task (was: Test) Parent: FLINK-6619 > Improve JAVA and SCALA logical plans consistent test > > > Key: FLINK-6617 > URL: https://issues.apache.org/jira/browse/FLINK-6617 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > Currently,we need some `StringExpression` test,for all JAVA and SCALA API. > Such as:`GroupAggregations`,`GroupWindowAggregaton`(Session,Tumble),`Calc` > etc. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6618) Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.
[ https://issues.apache.org/jira/browse/FLINK-6618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6618: --- Description: I find 2 bugs as follows: 1. {{GroupWindowStringExpressionTest}} testcase bug: {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resScala.logicalPlan)}} 2. When i fix the bug above, we got anther bug: {code} java.lang.AssertionError: Logical Plans do not match Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 'long, 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) Actual :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 1440.millis, 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 'int, 'long, 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) {code} was: I find 2 bugs as follows: 1. {{GroupWindowStringExpressionTest}} testcase bug, {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resScala.logicalPlan)}} 2. When i fix the bug above, we got anther bug: {code} java.lang.AssertionError: Logical Plans do not match Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 'long, 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) Actual :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 1440.millis, 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 'int, 'long, 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) {code} > Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans. > - > > Key: FLINK-6618 > URL: https://issues.apache.org/jira/browse/FLINK-6618 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > I find 2 bugs as follows: > 1. {{GroupWindowStringExpressionTest}} testcase bug: > {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, > resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", > resJava.logicalPlan, resScala.logicalPlan)}} > 2. When i fix the bug above, we got anther bug: > {code} > java.lang.AssertionError: Logical Plans do not match > Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as > '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, > 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) > as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, > WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, > 'long, > 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER > int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) > Actual :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as > '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, > 1440.millis, > 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, > sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, > WeightedAvg(ArrayBuffer('int,
[jira] [Updated] (FLINK-6618) Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.
[ https://issues.apache.org/jira/browse/FLINK-6618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6618: --- Description: I find 2 bugs as follows: 1. {{GroupWindowStringExpressionTest}} testcase bug, {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resScala.logicalPlan)}} 2. When i fix the bug above, we got anther bug: {code} java.lang.AssertionError: Logical Plans do not match Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 'long, 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) Actual :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 1440.millis, 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 'int, 'long, 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) {code} was: I find 2 bugs as follows: 1. `GroupWindowStringExpressionTest` testcase bug, `Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resJava.logicalPlan)` -> `Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resScala.logicalPlan)` 2. When i fix the bug above, we got anther bug: {code} java.lang.AssertionError: Logical Plans do not match Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 'long, 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) Actual :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 1440.millis, 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 'int, 'long, 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) {code} > Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans. > - > > Key: FLINK-6618 > URL: https://issues.apache.org/jira/browse/FLINK-6618 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > I find 2 bugs as follows: > 1. {{GroupWindowStringExpressionTest}} testcase bug, >{{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, > resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", > resJava.logicalPlan, resScala.logicalPlan)}} > 2. When i fix the bug above, we got anther bug: > {code} > java.lang.AssertionError: Logical Plans do not match > Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as > '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, > 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) > as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, > WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, > 'long, > 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER > int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) > Actual :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as > '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, > 1440.millis, > 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, > sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, > WeightedAvg(ArrayBuffer('int, 'int))
[jira] [Created] (FLINK-6618) Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.
sunjincheng created FLINK-6618: -- Summary: Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans. Key: FLINK-6618 URL: https://issues.apache.org/jira/browse/FLINK-6618 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.0 Reporter: sunjincheng Assignee: sunjincheng I find 2 bugs as follows: 1. `GroupWindowStringExpressionTest` testcase bug, `Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resJava.logicalPlan)` -> `Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resScala.logicalPlan)` 2. When i fix the bug above, we got anther bug: {code} java.lang.AssertionError: Logical Plans do not match Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 'long, 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) Actual :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 1440.millis, 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 'int, 'long, 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime) {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6617) Improve JAVA and SCALA logical plans consistent test
sunjincheng created FLINK-6617: -- Summary: Improve JAVA and SCALA logical plans consistent test Key: FLINK-6617 URL: https://issues.apache.org/jira/browse/FLINK-6617 Project: Flink Issue Type: Test Components: Table API & SQL Affects Versions: 1.3.0 Reporter: sunjincheng Assignee: sunjincheng Currently,we need some `StringExpression` test,for all JAVA and SCALA API. Such as:`GroupAggregations`,`GroupWindowAggregaton`(Session,Tumble),`Calc` etc. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6608) Relax Kerberos login contexts parsing by trimming whitespaces in contexts list
[ https://issues.apache.org/jira/browse/FLINK-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015062#comment-16015062 ] ASF GitHub Bot commented on FLINK-6608: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3928#discussion_r117146767 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -230,10 +230,15 @@ private void validate() { } private static List parseList(String value) { - if(value == null) { + if(value == null || value.isEmpty()) { return Collections.emptyList(); } - return Arrays.asList(value.split(",")); + + return Arrays.asList(value + .replaceAll("\\s*,\\s*", ",") // remove whitespaces surrounding commas + .replaceAll(",,", ",") // remove empty entries --- End diff -- Hi, @tzulitai `.replaceAll("\\s*,\\s*", ",").replaceAll(",,", ",") ` can not deal with `" a, b,,, c d, e "` I think we can using the expression as follows: `str.trim().replaceAll("(\\s*,+\\s*)+", ",")` OR `str.replace("/^\\s+|\\s+$/g","").replaceAll("(\\s*,\\s*)+", ",");` > Relax Kerberos login contexts parsing by trimming whitespaces in contexts list > -- > > Key: FLINK-6608 > URL: https://issues.apache.org/jira/browse/FLINK-6608 > Project: Flink > Issue Type: Improvement > Components: Configuration, Security >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Minor > Fix For: 1.3.0, 1.4.0 > > > The Kerberos login contexts list parsing right now isn't quite user-friendly. > The list must be provided as: {{security.kerberos.login.contexts: > Client,KafkaClient}}, without any whitespace in between. > We can relax this to be more user-friendly by trimming any whitespaces in the > list. > A user actually stumbled across this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problems-with-Kerberos-Kafka-connection-in-version-1-2-0-td12580.html#a12589 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3928: [FLINK-6608] [security, config] Relax Kerberos log...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3928#discussion_r117146767 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -230,10 +230,15 @@ private void validate() { } private static List parseList(String value) { - if(value == null) { + if(value == null || value.isEmpty()) { return Collections.emptyList(); } - return Arrays.asList(value.split(",")); + + return Arrays.asList(value + .replaceAll("\\s*,\\s*", ",") // remove whitespaces surrounding commas + .replaceAll(",,", ",") // remove empty entries --- End diff -- Hi, @tzulitai `.replaceAll("\\s*,\\s*", ",").replaceAll(",,", ",") ` can not deal with `" a, b,,, c d, e "` I think we can using the expression as follows: `str.trim().replaceAll("(\\s*,+\\s*)+", ",")` OR `str.replace("/^\\s+|\\s+$/g","").replaceAll("(\\s*,\\s*)+", ",");` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015057#comment-16015057 ] ASF GitHub Bot commented on FLINK-6075: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r117146377 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{RowTypeInfo, ListTypeInfo} +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.configuration.Configuration +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import java.util.{List => JList, ArrayList => JArrayList} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the aggregate in bounded rowtime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param fieldCount Is used to indicate fields in the current element to forward + * @param inputType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class RowTimeSortProcessFunction( + private val fieldCount: Int, + private val inputRowType: CRowTypeInfo, + private val rowComparator: CollectionRowComparator) +extends ProcessFunction[CRow, CRow] { + + Preconditions.checkNotNull(rowComparator) + + private val sortArray: ArrayList[Row] = new ArrayList[Row] + + // the state which keeps all the events that are not expired. + // Each timestamp will contain an associated list with the events + // received at that timestamp + private var dataState: MapState[Long, JList[Row]] = _ + +// the state which keeps the last triggering timestamp to filter late events + private var lastTriggeringTsState: ValueState[Long] = _ + + private var outputC: CRow = _ + + + override def open(config: Configuration) { + +val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] +val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row]( +inputRowType.asInstanceOf[CRowTypeInfo].rowType) + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( +"dataState", +keyTypeInformation, +valueTypeInformation) + +dataState = getRuntimeContext.getMapState(mapStateDescriptor) + +val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) +lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + } + + + override def processElement( +inputC: CRow, +ctx: ProcessFunction[CRow,
[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...
Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r117146377 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{RowTypeInfo, ListTypeInfo} +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.configuration.Configuration +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import java.util.{List => JList, ArrayList => JArrayList} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the aggregate in bounded rowtime sort without offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param fieldCount Is used to indicate fields in the current element to forward + * @param inputType It is used to mark the type of the incoming data + * @param rowComparator the [[java.util.Comparator]] is used for this sort aggregation + */ +class RowTimeSortProcessFunction( + private val fieldCount: Int, + private val inputRowType: CRowTypeInfo, + private val rowComparator: CollectionRowComparator) +extends ProcessFunction[CRow, CRow] { + + Preconditions.checkNotNull(rowComparator) + + private val sortArray: ArrayList[Row] = new ArrayList[Row] + + // the state which keeps all the events that are not expired. + // Each timestamp will contain an associated list with the events + // received at that timestamp + private var dataState: MapState[Long, JList[Row]] = _ + +// the state which keeps the last triggering timestamp to filter late events + private var lastTriggeringTsState: ValueState[Long] = _ + + private var outputC: CRow = _ + + + override def open(config: Configuration) { + +val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] +val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row]( +inputRowType.asInstanceOf[CRowTypeInfo].rowType) + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( +"dataState", +keyTypeInformation, +valueTypeInformation) + +dataState = getRuntimeContext.getMapState(mapStateDescriptor) + +val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) +lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + } + + + override def processElement( +inputC: CRow, +ctx: ProcessFunction[CRow, CRow]#Context, +out: Collector[CRow]): Unit = { + + val input = inputC.row + + if( outputC == null) { + outputC = new CRow(input, true) +} + +// triggering timestamp for trigger
[jira] [Commented] (FLINK-6608) Relax Kerberos login contexts parsing by trimming whitespaces in contexts list
[ https://issues.apache.org/jira/browse/FLINK-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015027#comment-16015027 ] ASF GitHub Bot commented on FLINK-6608: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3928#discussion_r117144044 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -230,10 +231,21 @@ private void validate() { } private static List parseList(String value) { - if(value == null) { + if(value == null || value.isEmpty()) { return Collections.emptyList(); } - return Arrays.asList(value.split(",")); --- End diff -- The wisdom of that paragraph has been debated extensively on stackoverflow. I'll just say that the `StringTokenizer` is not deprecated and, in my opinion, fits this scenario uniquely well. Go ahead either way. > Relax Kerberos login contexts parsing by trimming whitespaces in contexts list > -- > > Key: FLINK-6608 > URL: https://issues.apache.org/jira/browse/FLINK-6608 > Project: Flink > Issue Type: Improvement > Components: Configuration, Security >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Minor > Fix For: 1.3.0, 1.4.0 > > > The Kerberos login contexts list parsing right now isn't quite user-friendly. > The list must be provided as: {{security.kerberos.login.contexts: > Client,KafkaClient}}, without any whitespace in between. > We can relax this to be more user-friendly by trimming any whitespaces in the > list. > A user actually stumbled across this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problems-with-Kerberos-Kafka-connection-in-version-1-2-0-td12580.html#a12589 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3928: [FLINK-6608] [security, config] Relax Kerberos log...
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3928#discussion_r117144044 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -230,10 +231,21 @@ private void validate() { } private static List parseList(String value) { - if(value == null) { + if(value == null || value.isEmpty()) { return Collections.emptyList(); } - return Arrays.asList(value.split(",")); --- End diff -- The wisdom of that paragraph has been debated extensively on stackoverflow. I'll just say that the `StringTokenizer` is not deprecated and, in my opinion, fits this scenario uniquely well. Go ahead either way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6608) Relax Kerberos login contexts parsing by trimming whitespaces in contexts list
[ https://issues.apache.org/jira/browse/FLINK-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015010#comment-16015010 ] ASF GitHub Bot commented on FLINK-6608: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3928#discussion_r117142993 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -230,10 +231,21 @@ private void validate() { } private static List parseList(String value) { - if(value == null) { + if(value == null || value.isEmpty()) { return Collections.emptyList(); } - return Arrays.asList(value.split(",")); --- End diff -- @EronWright your suggestion will be work. But I'd like using regular expression. The JDK DOC also has the same recommend: `StringTokenizer is a legacy class that is retained for compatibility reasons although its use is discouraged in new code. It is recommended that anyone seeking this functionality use the split method of String or the java.util.regex package instead.` Please see: http://docs.oracle.com/javase/7/docs/api/java/util/StringTokenizer.html What do you think ? @EronWright @tzulitai Best, SunJincheng > Relax Kerberos login contexts parsing by trimming whitespaces in contexts list > -- > > Key: FLINK-6608 > URL: https://issues.apache.org/jira/browse/FLINK-6608 > Project: Flink > Issue Type: Improvement > Components: Configuration, Security >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Minor > Fix For: 1.3.0, 1.4.0 > > > The Kerberos login contexts list parsing right now isn't quite user-friendly. > The list must be provided as: {{security.kerberos.login.contexts: > Client,KafkaClient}}, without any whitespace in between. > We can relax this to be more user-friendly by trimming any whitespaces in the > list. > A user actually stumbled across this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problems-with-Kerberos-Kafka-connection-in-version-1-2-0-td12580.html#a12589 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3928: [FLINK-6608] [security, config] Relax Kerberos log...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3928#discussion_r117142993 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -230,10 +231,21 @@ private void validate() { } private static List parseList(String value) { - if(value == null) { + if(value == null || value.isEmpty()) { return Collections.emptyList(); } - return Arrays.asList(value.split(",")); --- End diff -- @EronWright your suggestion will be work. But I'd like using regular expression. The JDK DOC also has the same recommend: `StringTokenizer is a legacy class that is retained for compatibility reasons although its use is discouraged in new code. It is recommended that anyone seeking this functionality use the split method of String or the java.util.regex package instead.` Please see: http://docs.oracle.com/javase/7/docs/api/java/util/StringTokenizer.html What do you think ? @EronWright @tzulitai Best, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6065) Make TransportClient for ES5 pluggable
[ https://issues.apache.org/jira/browse/FLINK-6065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014957#comment-16014957 ] ASF GitHub Bot commented on FLINK-6065: --- Github user sschaef commented on the issue: https://github.com/apache/flink/pull/3934 The Java7 run failed because my solution uses Java8 features. I guess I have to find another way to fix this issue then. > Make TransportClient for ES5 pluggable > -- > > Key: FLINK-6065 > URL: https://issues.apache.org/jira/browse/FLINK-6065 > Project: Flink > Issue Type: Improvement > Components: ElasticSearch Connector, Streaming Connectors >Reporter: Robert Metzger > > This JIRA is based on a user request: > http://stackoverflow.com/questions/42807454/flink-xpack-elasticsearch-5-elasticsearchsecurityexception-missing-autentication?noredirect=1#comment72728053_42807454 > Currently, in the {{Elasticsearch5ApiCallBridge}} the > {{PreBuiltTransportClient}} is hardcoded. It would be nice to make this > client pluggable to allow using other clients such as the > {{PreBuiltXPackTransportClient}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3934: [FLINK-6065] Add initClient method to ElasticsearchApiCal...
Github user sschaef commented on the issue: https://github.com/apache/flink/pull/3934 The Java7 run failed because my solution uses Java8 features. I guess I have to find another way to fix this issue then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6578) SharedBuffer creates self-loops when having elements with same value/timestamp.
[ https://issues.apache.org/jira/browse/FLINK-6578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014903#comment-16014903 ] Ted Yu commented on FLINK-6578: --- SharedBuffer.java isn't shown in the PR, so allow me to comment here. {code} public int hashCode() { return (int) (31 * (timestamp ^ timestamp >>> 32) + 31 * value.hashCode()) + counter; {code} Multiplier of 31 is applied to both (timestamp ^ timestamp >>> 32) and value.hashCode(). The following is probably the right expression: {code} return (int) 31 * (31 * (timestamp ^ timestamp >>> 32) + value.hashCode()) + counter; {code} > SharedBuffer creates self-loops when having elements with same > value/timestamp. > --- > > Key: FLINK-6578 > URL: https://issues.apache.org/jira/browse/FLINK-6578 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.3.0 > > > This is a test that fails with the current implementation due to the fact > that the looping state accepts the two {{middleEvent1}} elements but the > shared buffer cannot distinguish between them and gets trapped in an infinite > loop leading to running out of memory. > {code} > @Test > public void testEagerZeroOrMoreSameElement() { > ListinputEvents = new ArrayList<>(); > Event startEvent = new Event(40, "c", 1.0); > Event middleEvent1 = new Event(41, "a", 2.0); > Event middleEvent2 = new Event(42, "a", 3.0); > Event middleEvent3 = new Event(43, "a", 4.0); > Event end1 = new Event(44, "b", 5.0); > inputEvents.add(new StreamRecord<>(startEvent, 1)); > inputEvents.add(new StreamRecord<>(middleEvent1, 3)); > inputEvents.add(new StreamRecord<>(middleEvent1, 3)); > inputEvents.add(new StreamRecord<>(middleEvent1, 3)); > inputEvents.add(new StreamRecord<>(middleEvent2, 4)); > inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5)); > inputEvents.add(new StreamRecord<>(middleEvent3, 6)); > inputEvents.add(new StreamRecord<>(middleEvent3, 6)); > inputEvents.add(new StreamRecord<>(end1, 7)); > Pattern pattern = > Pattern.begin("start").where(new SimpleCondition() { > private static final long serialVersionUID = > 5726188262756267490L; > @Override > public boolean filter(Event value) throws Exception { > return value.getName().equals("c"); > } > }).followedBy("middle").where(new SimpleCondition() { > private static final long serialVersionUID = > 5726188262756267490L; > @Override > public boolean filter(Event value) throws Exception { > return value.getName().equals("a"); > } > }).oneOrMore().optional().followedBy("end1").where(new > SimpleCondition() { > private static final long serialVersionUID = > 5726188262756267490L; > @Override > public boolean filter(Event value) throws Exception { > return value.getName().equals("b"); > } > }); > NFA nfa = NFACompiler.compile(pattern, > Event.createTypeSerializer(), false); > final List resultingPatterns = > feedNFA(inputEvents, nfa); > compareMaps(resultingPatterns, Lists.
newArrayList( > Lists.newArrayList(startEvent, middleEvent1, > middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1), > Lists.newArrayList(startEvent, middleEvent1, > middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1), > Lists.newArrayList(startEvent, middleEvent1, > middleEvent1, middleEvent1, middleEvent2, end1), > Lists.newArrayList(startEvent, middleEvent1, > middleEvent1, middleEvent1, end1), > Lists.newArrayList(startEvent, middleEvent1, > middleEvent1, end1), > Lists.newArrayList(startEvent, middleEvent1, > end1), > Lists.newArrayList(startEvent, end1) > )); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6604) Remove Java Serialization from the CEP library.
[ https://issues.apache.org/jira/browse/FLINK-6604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014894#comment-16014894 ] Ted Yu commented on FLINK-6604: --- I was talking about DeweyNumberSerializer in the above comment. > Remove Java Serialization from the CEP library. > --- > > Key: FLINK-6604 > URL: https://issues.apache.org/jira/browse/FLINK-6604 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6604) Remove Java Serialization from the CEP library.
[ https://issues.apache.org/jira/browse/FLINK-6604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014893#comment-16014893 ] Ted Yu commented on FLINK-6604: --- equals() only checks the class. I think we'd better align with hashCode() to be consistent. > Remove Java Serialization from the CEP library. > --- > > Key: FLINK-6604 > URL: https://issues.apache.org/jira/browse/FLINK-6604 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6583) Enable QueryConfig in count base GroupWindow
[ https://issues.apache.org/jira/browse/FLINK-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014847#comment-16014847 ] ASF GitHub Bot commented on FLINK-6583: --- Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r117122323 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate( "non-windowed GroupBy aggregation.") } +val isCountWindow = window match { + case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true + case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true + case _ => false +} + +if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( --- End diff -- Should this be error() ? > Enable QueryConfig in count base GroupWindow > > > Key: FLINK-6583 > URL: https://issues.apache.org/jira/browse/FLINK-6583 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0, 1.4.0 > > > Enable QueryConfig in count base GroupWindow by Add a custom Trigger > `CountTriggerWithCleanupState`. See more in FLINK-6491. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r117122323 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate( "non-windowed GroupBy aggregation.") } +val isCountWindow = window match { + case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true + case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true + case _ => false +} + +if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( --- End diff -- Should this be error() ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6583) Enable QueryConfig in count base GroupWindow
[ https://issues.apache.org/jira/browse/FLINK-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014839#comment-16014839 ] ASF GitHub Bot commented on FLINK-6583: --- Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r117121382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size) => stream.countWindow(toLong(size), toLong(slide)) + .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide))); --- End diff -- Should this be toLong(size) ? > Enable QueryConfig in count base GroupWindow > > > Key: FLINK-6583 > URL: https://issues.apache.org/jira/browse/FLINK-6583 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0, 1.4.0 > > > Enable QueryConfig in count base GroupWindow by Add a custom Trigger > `CountTriggerWithCleanupState`. See more in FLINK-6491. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r117121382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size) => stream.countWindow(toLong(size), toLong(slide)) + .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide))); --- End diff -- Should this be toLong(size) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6065) Make TransportClient for ES5 pluggable
[ https://issues.apache.org/jira/browse/FLINK-6065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014817#comment-16014817 ] ASF GitHub Bot commented on FLINK-6065: --- GitHub user sschaef opened a pull request: https://github.com/apache/flink/pull/3934 [FLINK-6065] Add initClient method to ElasticsearchApiCallBridge This adds the method `initClient` to `ElasticsearchApiCallBridge` in order to resolve FLINK-6065. This new method takes as argument a function that can create a `TransportClient`. This is required in order to not hardcode the `TransportClient` in the implementation of `createClient`. `createClient` continues to exist in order to allow backwards compatibility. No tests provided because I couldn't find any existing test class that tests the implementation of `ElasticsearchApiCallBridge`. This is my first contribution, I haven't signed the ICLA yet (which I will do next). I didn't provide any tests yet because I had no idea how the functionality should be tested or where a test class should be added. If you would like to see tests for the changes, please tell me. The change fixes the ticket but maybe you would like to see a different way on how to fix the issue. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sschaef/flink flink-6065 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3934.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3934 commit 2815b0c9ca407838611ba60b9c35a0ac550005e3 Author: Simon SchäferDate: 2017-05-17T20:24:14Z [FLINK-6065] Add initClient method to ElasticsearchApiCallBridge This adds the method `initClient` to `ElasticsearchApiCallBridge` in order to resolve FLINK-6065. This new method takes as argument a function that can create a `TransportClient`. This is required in order to not hardcode the `TransportClient` in the implementation of `createClient`. `createClient` continues to exist in order to allow backwards compatibility. No tests provided because I couldn't find any existing test class that tests the implementation of `ElasticsearchApiCallBridge`. > Make TransportClient for ES5 pluggable > -- > > Key: FLINK-6065 > URL: https://issues.apache.org/jira/browse/FLINK-6065 > Project: Flink > Issue Type: Improvement > Components: ElasticSearch Connector, Streaming Connectors >Reporter: Robert Metzger > > This JIRA is based on a user request: > http://stackoverflow.com/questions/42807454/flink-xpack-elasticsearch-5-elasticsearchsecurityexception-missing-autentication?noredirect=1#comment72728053_42807454 > Currently, in the {{Elasticsearch5ApiCallBridge}} the > {{PreBuiltTransportClient}} is hardcoded. It would be nice to make this > client pluggable to allow using other clients such as the > {{PreBuiltXPackTransportClient}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3934: [FLINK-6065] Add initClient method to Elasticsearc...
GitHub user sschaef opened a pull request: https://github.com/apache/flink/pull/3934 [FLINK-6065] Add initClient method to ElasticsearchApiCallBridge This adds the method `initClient` to `ElasticsearchApiCallBridge` in order to resolve FLINK-6065. This new method takes as argument a function that can create a `TransportClient`. This is required in order to not hardcode the `TransportClient` in the implementation of `createClient`. `createClient` continues to exist in order to allow backwards compatibility. No tests provided because I couldn't find any existing test class that tests the implementation of `ElasticsearchApiCallBridge`. This is my first contribution, I haven't signed the ICLA yet (which I will do next). I didn't provide any tests yet because I had no idea how the functionality should be tested or where a test class should be added. If you would like to see tests for the changes, please tell me. The change fixes the ticket but maybe you would like to see a different way on how to fix the issue. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sschaef/flink flink-6065 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3934.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3934 commit 2815b0c9ca407838611ba60b9c35a0ac550005e3 Author: Simon SchäferDate: 2017-05-17T20:24:14Z [FLINK-6065] Add initClient method to ElasticsearchApiCallBridge This adds the method `initClient` to `ElasticsearchApiCallBridge` in order to resolve FLINK-6065. This new method takes as argument a function that can create a `TransportClient`. This is required in order to not hardcode the `TransportClient` in the implementation of `createClient`. `createClient` continues to exist in order to allow backwards compatibility. No tests provided because I couldn't find any existing test class that tests the implementation of `ElasticsearchApiCallBridge`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014647#comment-16014647 ] Andrey commented on FLINK-6613: --- Hi Dmytro. There are several problems still exist: 1) I want to fail fast and kill jvm if it spends too much time doing gc. So we just can't remove "UseGCOverheadLimit" option. 2) Even if I remove this option, root cause will still exist. And the root cause is: kafka integration incorrectly deals with large messages. It reads as many messages as it could from kafka. And that will lead to OOM. GC settings irrelevant here, because these messages should not and will not be eligible for GC. 3) If you recommend G1, then default startup scripts should be changed. Have you tried to reproduce issue? > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6608) Relax Kerberos login contexts parsing by trimming whitespaces in contexts list
[ https://issues.apache.org/jira/browse/FLINK-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014506#comment-16014506 ] ASF GitHub Bot commented on FLINK-6608: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3928#discussion_r117068349 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -230,10 +231,21 @@ private void validate() { } private static List parseList(String value) { - if(value == null) { + if(value == null || value.isEmpty()) { return Collections.emptyList(); } - return Arrays.asList(value.split(",")); --- End diff -- An alternative to consider: `Collections.list(new StringTokenizer(",X, ,,Y , Z,", ", "))` produces `[X, Y, Z]`. > Relax Kerberos login contexts parsing by trimming whitespaces in contexts list > -- > > Key: FLINK-6608 > URL: https://issues.apache.org/jira/browse/FLINK-6608 > Project: Flink > Issue Type: Improvement > Components: Configuration, Security >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Minor > Fix For: 1.3.0, 1.4.0 > > > The Kerberos login contexts list parsing right now isn't quite user-friendly. > The list must be provided as: {{security.kerberos.login.contexts: > Client,KafkaClient}}, without any whitespace in between. > We can relax this to be more user-friendly by trimming any whitespaces in the > list. > A user actually stumbled across this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problems-with-Kerberos-Kafka-connection-in-version-1-2-0-td12580.html#a12589 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3928: [FLINK-6608] [security, config] Relax Kerberos log...
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3928#discussion_r117068349 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -230,10 +231,21 @@ private void validate() { } private static List parseList(String value) { - if(value == null) { + if(value == null || value.isEmpty()) { return Collections.emptyList(); } - return Arrays.asList(value.split(",")); --- End diff -- An alternative to consider: `Collections.list(new StringTokenizer(",X, ,,Y , Z,", ", "))` produces `[X, Y, Z]`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014378#comment-16014378 ] ASF GitHub Bot commented on FLINK-6606: --- GitHub user EronWright opened a pull request: https://github.com/apache/flink/pull/3933 [FLINK-6606] Create checkpoint hook with user classloader - wrap calls to MasterTriggerRestoreHook (and its factory) such that the user classloader is applied You can merge this pull request into a Git repository by running: $ git pull https://github.com/EronWright/flink FLINK-6606 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3933.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3933 commit fbf904a60a1e252944e1cbc7ad60c5d95ae28ec2 Author: Wright, EronDate: 2017-05-17T16:46:13Z FLINK-6606 - wrap calls to MasterTriggerRestoreHook (and its factory) such that the user classloader is applied > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3933: [FLINK-6606] Create checkpoint hook with user clas...
GitHub user EronWright opened a pull request: https://github.com/apache/flink/pull/3933 [FLINK-6606] Create checkpoint hook with user classloader - wrap calls to MasterTriggerRestoreHook (and its factory) such that the user classloader is applied You can merge this pull request into a Git repository by running: $ git pull https://github.com/EronWright/flink FLINK-6606 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3933.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3933 commit fbf904a60a1e252944e1cbc7ad60c5d95ae28ec2 Author: Wright, EronDate: 2017-05-17T16:46:13Z FLINK-6606 - wrap calls to MasterTriggerRestoreHook (and its factory) such that the user classloader is applied --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014357#comment-16014357 ] Dmytro Shkvyra commented on FLINK-6613: --- Hi [~dernasherbrezon], I have assumed that you say that you using ParallelGC. Please see https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/parallel.html {quote} The parallel collector throws an OutOfMemoryError if too much time is being spent in garbage collection (GC): If more than 98% of the total time is spent in garbage collection and less than 2% of the heap is recovered, then an OutOfMemoryError is thrown. This feature is designed to prevent applications from running for an extended period of time while making little or no progress because the heap is too small. If necessary, this feature can be disabled by adding the option -XX:-UseGCOverheadLimit to the command line. {quote} and if {quote} 3) Send 3300 messages each 635Kb. So total size is ~2G {quote} ParallelGC cant collect all garbage in time. BTW, we have two parallel CG algorithms http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/gc01/index.html and old one clean old generation also. I think we can close this ticket, because it could be solved by using GC1 and out of scope FLINK > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3929: [FLINK-6543] [table] Deprecate toDataStream
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3929#discussion_r117045701 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala --- @@ -144,13 +144,119 @@ class StreamTableEnvironment( * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + @Deprecated + def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete changes, the conversion will fail. +* +* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: +* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] +* types: Fields are mapped by position, field types must match. +* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. +* +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = +toAppendStream(table, typeInfo) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete changes, the conversion will fail. +* +* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: +* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] +* types: Fields are mapped by position, field types must match. +* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. +* +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @param queryConfig The configuration of the query to generate. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + def toDataStream[T]( + table: Table, + clazz: Class[T], + queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, clazz, queryConfig) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete changes, the conversion will fail. +* +* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: +* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] +* types:
[GitHub] flink pull request #3929: [FLINK-6543] [table] Deprecate toDataStream
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3929#discussion_r117045613 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala --- @@ -144,13 +144,119 @@ class StreamTableEnvironment( * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + @Deprecated + def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete changes, the conversion will fail. +* +* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: +* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] +* types: Fields are mapped by position, field types must match. +* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. +* +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = --- End diff -- add `@Deprecated` annotation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3929: [FLINK-6543] [table] Deprecate toDataStream
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3929#discussion_r117050100 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala --- @@ -144,13 +144,119 @@ class StreamTableEnvironment( * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + @Deprecated + def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) --- End diff -- This method is still referenced from the `java` `package-info.java` file and the `java` `SqlITCase`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6543) Deprecate toDataStream
[ https://issues.apache.org/jira/browse/FLINK-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014352#comment-16014352 ] ASF GitHub Bot commented on FLINK-6543: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3929#discussion_r117045613 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala --- @@ -144,13 +144,119 @@ class StreamTableEnvironment( * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + @Deprecated + def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete changes, the conversion will fail. +* +* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: +* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] +* types: Fields are mapped by position, field types must match. +* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. +* +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = --- End diff -- add `@Deprecated` annotation > Deprecate toDataStream > -- > > Key: FLINK-6543 > URL: https://issues.apache.org/jira/browse/FLINK-6543 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Fix For: 1.3.0 > > > With retraction support, we should deprecate {{toDataStream}} and introduce a > new {{toAppendStream}} to clearly differentiate between retraction and > non-retraction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3929: [FLINK-6543] [table] Deprecate toDataStream
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3929#discussion_r117045675 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala --- @@ -144,13 +144,119 @@ class StreamTableEnvironment( * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + @Deprecated + def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete changes, the conversion will fail. +* +* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: +* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] +* types: Fields are mapped by position, field types must match. +* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. +* +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = +toAppendStream(table, typeInfo) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete changes, the conversion will fail. +* +* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: +* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] +* types: Fields are mapped by position, field types must match. +* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. +* +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @param queryConfig The configuration of the query to generate. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + def toDataStream[T]( --- End diff -- add `@Deprecated` annotation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6543) Deprecate toDataStream
[ https://issues.apache.org/jira/browse/FLINK-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014353#comment-16014353 ] ASF GitHub Bot commented on FLINK-6543: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3929#discussion_r117050100 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala --- @@ -144,13 +144,119 @@ class StreamTableEnvironment( * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + @Deprecated + def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) --- End diff -- This method is still referenced from the `java` `package-info.java` file and the `java` `SqlITCase`. > Deprecate toDataStream > -- > > Key: FLINK-6543 > URL: https://issues.apache.org/jira/browse/FLINK-6543 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Fix For: 1.3.0 > > > With retraction support, we should deprecate {{toDataStream}} and introduce a > new {{toAppendStream}} to clearly differentiate between retraction and > non-retraction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6543) Deprecate toDataStream
[ https://issues.apache.org/jira/browse/FLINK-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014351#comment-16014351 ] ASF GitHub Bot commented on FLINK-6543: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3929#discussion_r117045701 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala --- @@ -144,13 +144,119 @@ class StreamTableEnvironment( * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + @Deprecated + def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete changes, the conversion will fail. +* +* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: +* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] +* types: Fields are mapped by position, field types must match. +* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. +* +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = +toAppendStream(table, typeInfo) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete changes, the conversion will fail. +* +* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: +* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] +* types: Fields are mapped by position, field types must match. +* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. +* +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @param queryConfig The configuration of the query to generate. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + def toDataStream[T]( + table: Table, + clazz: Class[T], + queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, clazz, queryConfig) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete
[jira] [Commented] (FLINK-6543) Deprecate toDataStream
[ https://issues.apache.org/jira/browse/FLINK-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014354#comment-16014354 ] ASF GitHub Bot commented on FLINK-6543: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3929#discussion_r117045675 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala --- @@ -144,13 +144,119 @@ class StreamTableEnvironment( * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + @Deprecated + def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete changes, the conversion will fail. +* +* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: +* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] +* types: Fields are mapped by position, field types must match. +* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. +* +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = +toAppendStream(table, typeInfo) + + /** +* Converts the given [[Table]] into an append [[DataStream]] of a specified type. +* +* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified +* by update or delete changes, the conversion will fail. +* +* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: +* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] +* types: Fields are mapped by position, field types must match. +* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. +* +* NOTE: This method only supports conversion of append-only tables. In order to make this +* more explicit in the future, please use [[toAppendStream()]] instead. +* If add and retract messages are required, use [[toRetractStream()]]. +* +* @param table The [[Table]] to convert. +* @param clazz The class of the type of the resulting [[DataStream]]. +* @param queryConfig The configuration of the query to generate. +* @tparam T The type of the resulting [[DataStream]]. +* @return The converted [[DataStream]]. +* @deprecated This method only supports conversion of append-only tables. In order to +*make this more explicit in the future, please use toAppendStream() instead. +*/ + def toDataStream[T]( --- End diff -- add `@Deprecated` annotation > Deprecate toDataStream > -- > > Key: FLINK-6543 > URL: https://issues.apache.org/jira/browse/FLINK-6543 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Fix For:
[jira] [Commented] (FLINK-6616) Clarify provenance of official Docker images
[ https://issues.apache.org/jira/browse/FLINK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014348#comment-16014348 ] ASF GitHub Bot commented on FLINK-6616: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3932 [FLINK-6616] [docs] Clarify provenance of official Docker images Note that the official Docker images for Flink are community supported and not an official release of the Apache Flink PMC. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 6616_clarify_provenance_of_official_docker_images Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3932.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3932 commit 26914aa7f49d5e458ded35926aa944699821db5c Author: Greg HoganDate: 2017-05-17T16:27:34Z [FLINK-6616] [docs] Clarify provenance of official Docker images Note that the official Docker images for Flink are community supported and not an official release of the Apache Flink PMC. > Clarify provenance of official Docker images > > > Key: FLINK-6616 > URL: https://issues.apache.org/jira/browse/FLINK-6616 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Critical > Fix For: 1.3.0 > > > Note that the official Docker images for Flink are community supported and > not an official release of the Apache Flink PMC. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3932: [FLINK-6616] [docs] Clarify provenance of official...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3932 [FLINK-6616] [docs] Clarify provenance of official Docker images Note that the official Docker images for Flink are community supported and not an official release of the Apache Flink PMC. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 6616_clarify_provenance_of_official_docker_images Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3932.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3932 commit 26914aa7f49d5e458ded35926aa944699821db5c Author: Greg HoganDate: 2017-05-17T16:27:34Z [FLINK-6616] [docs] Clarify provenance of official Docker images Note that the official Docker images for Flink are community supported and not an official release of the Apache Flink PMC. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6616) Clarify provenance of official Docker images
Greg Hogan created FLINK-6616: - Summary: Clarify provenance of official Docker images Key: FLINK-6616 URL: https://issues.apache.org/jira/browse/FLINK-6616 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.3.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Critical Fix For: 1.3.0 Note that the official Docker images for Flink are community supported and not an official release of the Apache Flink PMC. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6031) Add parameter for per job yarn clusters to control whether the user code jar is included into the system classloader.
[ https://issues.apache.org/jira/browse/FLINK-6031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014338#comment-16014338 ] ASF GitHub Bot commented on FLINK-6031: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3931 [FLINK-6031][yarn] Add config parameter for user-jar inclusion in cla… This PR adds a config parameter to control how user-jars are being handled in regards ot the system class path for per-job yarn clusters. The parameter allows: * to disable the inclusion in the system classpath and use the user classpath instead ("DISABLE") * prepend the user jars to the system class path ("FIRST") * append the user jars to the system class path ("LAST") * (default) add the user jars to the system class path based on the lexicographic order ("ORDER") You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 6031_yarn_userjars Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3931.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3931 > Add parameter for per job yarn clusters to control whether the user code jar > is included into the system classloader. > - > > Key: FLINK-6031 > URL: https://issues.apache.org/jira/browse/FLINK-6031 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Robert Metzger >Assignee: Chesnay Schepler >Priority: Critical > > FLINK-4913 added the user jar into the system classloader, when starting a > Flink per job YARN cluster. > Some users were experiencing issues with the changed behavior. > I suggest to introduce a new yarn specific configuration parameter (for the > flink-conf.yaml file) to control if the user jar is added into system > classloader. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3931: [FLINK-6031][yarn] Add config parameter for user-j...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3931 [FLINK-6031][yarn] Add config parameter for user-jar inclusion in cla… This PR adds a config parameter to control how user-jars are being handled in regards ot the system class path for per-job yarn clusters. The parameter allows: * to disable the inclusion in the system classpath and use the user classpath instead ("DISABLE") * prepend the user jars to the system class path ("FIRST") * append the user jars to the system class path ("LAST") * (default) add the user jars to the system class path based on the lexicographic order ("ORDER") You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 6031_yarn_userjars Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3931.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3931 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014307#comment-16014307 ] Andrey commented on FLINK-6613: --- -Xmx2g and ParallelGC. Default for openjdk. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014288#comment-16014288 ] ASF GitHub Bot commented on FLINK-4565: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3502 Hi @DmytroShkvyra, I'm sorry for the long time without response on this PR. We're currently in "testing & bugfix mode" for the 1.3 release and I'll be gone for two weeks in a few days. I'm sorry, but I won't have much time to look at this PR until I return. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dmytro Shkvyra > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3502: [FLINK-4565] Support for SQL IN operator
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3502 Hi @DmytroShkvyra, I'm sorry for the long time without response on this PR. We're currently in "testing & bugfix mode" for the 1.3 release and I'll be gone for two weeks in a few days. I'm sorry, but I won't have much time to look at this PR until I return. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6611) When TaskManager or JobManager restart after crash the PID file contain also the old PID
[ https://issues.apache.org/jira/browse/FLINK-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014268#comment-16014268 ] Mauro Cortellazzi commented on FLINK-6611: -- Thank you [~greghogan] for the answer, i close the issue > When TaskManager or JobManager restart after crash the PID file contain also > the old PID > > > Key: FLINK-6611 > URL: https://issues.apache.org/jira/browse/FLINK-6611 > Project: Flink > Issue Type: Task > Components: Startup Shell Scripts >Reporter: Mauro Cortellazzi >Assignee: Mauro Cortellazzi >Priority: Trivial > > When TaskManager or JobManager restart after crash the PID file contain also > the old PID. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6611) When TaskManager or JobManager restart after crash the PID file contain also the old PID
[ https://issues.apache.org/jira/browse/FLINK-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mauro Cortellazzi closed FLINK-6611. Resolution: Not A Problem > When TaskManager or JobManager restart after crash the PID file contain also > the old PID > > > Key: FLINK-6611 > URL: https://issues.apache.org/jira/browse/FLINK-6611 > Project: Flink > Issue Type: Task > Components: Startup Shell Scripts >Reporter: Mauro Cortellazzi >Assignee: Mauro Cortellazzi >Priority: Trivial > > When TaskManager or JobManager restart after crash the PID file contain also > the old PID. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014246#comment-16014246 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116992617 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema + +class DataStreamJoinRule + extends ConverterRule( + classOf[FlinkLogicalJoin], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] + +val joinInfo = join.analyzeCondition + +// joins require at least one equi-condition +!joinInfo.pairs().isEmpty --- End diff -- We can do the join also without equi-join condition. In this case it the join would run with parallelism 1. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014248#comment-16014248 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116794639 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117033348 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) { + leftStreamWindowSize = -leftStreamWindowSize + if (!greatCond.isEqual) { +leftStreamWindowSize -= 1 + } +} else { + leftStreamWindowSize = 0 +} + +// only
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014251#comment-16014251 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116790511 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count --- End diff -- -> `leftLogicalFieldCnt` Add `rightLogicalFieldCnt` > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014252#comment-16014252 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116795204 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014256#comment-16014256 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116854249 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117033149 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) { + leftStreamWindowSize = -leftStreamWindowSize + if (!greatCond.isEqual) { +leftStreamWindowSize -= 1 + } +} else { + leftStreamWindowSize = 0 +} + +// only
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014247#comment-16014247 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116791975 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 --- End diff -- `greaterConditions > 1 || greaterConditions == 0` can be replaced by `greaterConditions != 1` > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014253#comment-16014253 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116791061 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() --- End diff -- +r -> `greaterConditions` > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime -
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117006282 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.runtime.FilterRunner +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze join condition to get equi-conditon and other condition +* @param joinNode logicaljoin node +* @param expression the function to generate condition string +*/ + private[flink] def analyzeJoinCondition( +joinNode: FlinkLogicalJoin, +expression: (RexNode, List[String], Option[List[RexNode]]) => String) = { + +val joinInfo = joinNode.analyzeCondition() +val keyPairs = joinInfo.pairs.toList +val otherCondition = + if(joinInfo.isEqui) null + else joinInfo.getRemaining(joinNode.getCluster.getRexBuilder) + +val leftKeys = ArrayBuffer.empty[Int] +val rightKeys = ArrayBuffer.empty[Int] +if (!keyPairs.isEmpty) { + val leftFields = joinNode.getLeft.getRowType.getFieldList + val rightFields = joinNode.getRight.getRowType.getFieldList + + keyPairs.foreach(pair => { +val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName +val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName + +// check if keys are compatible +if (leftKeyType == rightKeyType) { + // add key pair + leftKeys.append(pair.source) + rightKeys.append(pair.target) +} else { + throw TableException( +"Equality join predicate on incompatible types.\n" + + s"\tLeft: ${joinNode.getLeft.toString},\n" + + s"\tRight: ${joinNode.getRight.toString},\n" + + s"\tCondition: (${expression(joinNode.getCondition, +joinNode.getRowType.getFieldNames.toList, None)})" + ) +} + }) +} +(leftKeys.toArray, rightKeys.toArray, otherCondition) + } + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( --- End diff -- I think the logic of this function is correct. However, I find it a bit hard to follow because it starts with many conditions. What do you think about the following approach: 1. convert
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117037366 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( --- End diff -- This method needs some good unit tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117027410 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) { + leftStreamWindowSize = -leftStreamWindowSize + if (!greatCond.isEqual) { +leftStreamWindowSize -= 1 + } +} else { + leftStreamWindowSize = 0 +} + +// only
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014262#comment-16014262 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117033348 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014236#comment-16014236 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116797215 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014264#comment-16014264 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117004436 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, --- End diff -- I think this would make the code in this class a lot simpler, because we would not need to recursively dig into the condition. We can iterate over all conjunctive conditions and check for each if it is a valid time bound condition and either remove it or not. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > *
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117002279 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, --- End diff -- As a first step, the `condition` should be converted into CNF (conjunctive normal form) for normalization. Calcite offers the `RexUtil.toCnf()` method for that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117031586 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) { + leftStreamWindowSize = -leftStreamWindowSize + if (!greatCond.isEqual) { +leftStreamWindowSize -= 1 + } +} else { + leftStreamWindowSize = 0 +} + +// only
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014258#comment-16014258 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117033149 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014254#comment-16014254 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117001241 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014260#comment-16014260 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117031586 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014255#comment-16014255 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116792128 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 --- End diff -- The whole condition can be changed to `lessConditions != 1 || greaterConditions != 1` > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014259#comment-16014259 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117006282 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.runtime.FilterRunner +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze join condition to get equi-conditon and other condition +* @param joinNode logicaljoin node +* @param expression the function to generate condition string +*/ + private[flink] def analyzeJoinCondition( +joinNode: FlinkLogicalJoin, +expression: (RexNode, List[String], Option[List[RexNode]]) => String) = { + +val joinInfo = joinNode.analyzeCondition() +val keyPairs = joinInfo.pairs.toList +val otherCondition = + if(joinInfo.isEqui) null + else joinInfo.getRemaining(joinNode.getCluster.getRexBuilder) + +val leftKeys = ArrayBuffer.empty[Int] +val rightKeys = ArrayBuffer.empty[Int] +if (!keyPairs.isEmpty) { + val leftFields = joinNode.getLeft.getRowType.getFieldList + val rightFields = joinNode.getRight.getRowType.getFieldList + + keyPairs.foreach(pair => { +val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName +val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName + +// check if keys are compatible +if (leftKeyType == rightKeyType) { + // add key pair + leftKeys.append(pair.source) + rightKeys.append(pair.target) +} else { + throw TableException( +"Equality join predicate on incompatible types.\n" + + s"\tLeft: ${joinNode.getLeft.toString},\n" + + s"\tRight: ${joinNode.getRight.toString},\n" + + s"\tCondition: (${expression(joinNode.getCondition, +joinNode.getRowType.getFieldNames.toList, None)})" + ) +} + }) +} +(leftKeys.toArray, rightKeys.toArray, otherCondition) + } + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014261#comment-16014261 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117037366 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( --- End diff -- This method needs some good unit tests. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014239#comment-16014239 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116861487 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { --- End diff -- more inline comments would be good > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116791596 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, --- End diff -- Please wrap the arguments of a function as follows (if it does not fit in a single line): ``` analyzeTimeCondition( condition, greateConditions, lessConditions, leftLogicalFieldCnt, inputType) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014263#comment-16014263 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117027410 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014249#comment-16014249 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116794136 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014241#comment-16014241 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116796524 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117004436 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, --- End diff -- I think this would make the code in this class a lot simpler, because we would not need to recursively dig into the condition. We can iterate over all conjunctive conditions and check for each if it is a valid time bound condition and either remove it or not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116790511 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count --- End diff -- -> `leftLogicalFieldCnt` Add `rightLogicalFieldCnt` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014235#comment-16014235 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116993635 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema + +class DataStreamJoinRule + extends ConverterRule( + classOf[FlinkLogicalJoin], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] + +val joinInfo = join.analyzeCondition --- End diff -- I think it makes sense to separate different join cases on the level of plan operators. That would mean we would check here if the join has bounded time predicates (for example `left.rowtime BETWEEN right.rowtime - INTERVAL '1' MINUTE AND right.rowtime + INTERVAL '1' MINUTE`). If this is the case, we would extract the relevant time predicates and create a stream-stream join RelNode. This would move a bit of code out of the `DataStreamJoin` into the rule. IMO, the benefit is that it will be easier to add other joins because we only need to add a new rule, a new RelNode and runtime code. Hence, we would not need to touch the existing join strategy. What do you think about this @hongyuhong? > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014257#comment-16014257 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r117002279 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, --- End diff -- As a first step, the `condition` should be converted into CNF (conjunctive normal form) for normalization. Calcite offers the `RexUtil.toCnf()` method for that. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014250#comment-16014250 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116792670 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) {
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r116796524 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( + condition: RexNode, + leftLogicalFieldCnt: Int, + leftPhysicalFieldCnt: Int, + inputType: RelDataType, + rexBuilder: RexBuilder, + config: TableConfig): (RelDataType, Long, Long, RexNode) = { +// analyze the time-conditon to get greate and less condition, +// make sure left stream field in the left of the condition +// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 < b.proctime +val greateConditions = new util.ArrayList[TimeSingleCondition]() +val lessConditions = new util.ArrayList[TimeSingleCondition]() +analyzeTimeCondition(condition, greateConditions, + lessConditions, leftLogicalFieldCnt, inputType) +if (greateConditions.size != lessConditions.size +|| greateConditions.size > 1 +|| greateConditions.size == 0) { + throw TableException( +"Equality join time conditon should have proctime or rowtime indicator." + ) +} + +val greatCond = greateConditions.get(0) +val lessCond = lessConditions.get(0) +if (greatCond.timeType != lessCond.timeType) { + throw TableException( +"Equality join time conditon should all use proctime or all use rowtime." + ) +} + +var leftStreamWindowSize: Long = 0 +var rightStreamWindowSize: Long = 0 + +// only a.proctime > b.proctime - interval '1' hour need to store a stream +val timeLiteral: RexLiteral = +reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, rexBuilder, config) +leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long] +// only need to store past records +if (leftStreamWindowSize < 0) { + leftStreamWindowSize = -leftStreamWindowSize + if (!greatCond.isEqual) { +leftStreamWindowSize -= 1 + } +} else { + leftStreamWindowSize = 0 +} + +// only