[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

2017-05-17 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
Thank you for your suggestion. It sounds good and will be more friendly to 
users than throwing exception in `FlinkKafkaConsumerBase`. I'll fix it soon, 
thanks :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015242#comment-16015242
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
Thank you for your suggestion. It sounds good and will be more friendly to 
users than throwing exception in `FlinkKafkaConsumerBase`. I'll fix it soon, 
thanks :)


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015237#comment-16015237
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3915
  
@zjureel For Kafka 0.11, I would expect it to just extend 
`FlinkKafkaConsumer010`.
As you can see, that is also the case right now for 010; its extending 
`FlinkKafkaConsumer09` and not the base class.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

2017-05-17 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3915
  
@zjureel For Kafka 0.11, I would expect it to just extend 
`FlinkKafkaConsumer010`.
As you can see, that is also the case right now for 010; its extending 
`FlinkKafkaConsumer09` and not the base class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6221) Add Prometheus support to metrics

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015224#comment-16015224
 ] 

ASF GitHub Bot commented on FLINK-6221:
---

Github user mbode commented on the issue:

https://github.com/apache/flink/pull/3833
  
@zentol Would you mind checking that I got the shading right?


> Add Prometheus support to metrics
> -
>
> Key: FLINK-6221
> URL: https://issues.apache.org/jira/browse/FLINK-6221
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Joshua Griffith
>Assignee: Maximilian Bode
>Priority: Minor
>
> [Prometheus|https://prometheus.io/] is becoming popular for metrics and 
> alerting. It's possible to use 
> [statsd-exporter|https://github.com/prometheus/statsd_exporter] to load Flink 
> metrics into Prometheus but it would be far easier if Flink supported 
> Promethus as a metrics reporter. A [dropwizard 
> client|https://github.com/prometheus/client_java/tree/master/simpleclient_dropwizard]
>  exists that could be integrated into the existing metrics system.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

2017-05-17 Thread mbode
Github user mbode commented on the issue:

https://github.com/apache/flink/pull/3833
  
@zentol Would you mind checking that I got the shading right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6620) Add KeyGroupCheckpointedOperator interface that works for checkpointing key-groups

2017-05-17 Thread Jingsong Lee (JIRA)
Jingsong Lee created FLINK-6620:
---

 Summary: Add KeyGroupCheckpointedOperator interface that works for 
checkpointing key-groups
 Key: FLINK-6620
 URL: https://issues.apache.org/jira/browse/FLINK-6620
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Jingsong Lee
Priority: Minor


[~aljoscha] We have discussed it on: 
https://issues.apache.org/jira/browse/BEAM-1393

{code}
/**
 * This interface is used to checkpoint key-groups state.
 */
public interface KeyGroupCheckpointedOperator extends KeyGroupRestoringOperator{
  /**
   * Snapshots the state for a given {@code keyGroupIdx}.
   *
   * AbstractStreamOperator would call this hook in
   * AbstractStreamOperator.snapshotState() while iterating over the key groups.
   * @param keyGroupIndex the id of the key-group to be put in the snapshot.
   * @param out the stream to write to.
   */
  void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws 
Exception;
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015217#comment-16015217
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117162289
  
--- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
@@ -37,7 +37,7 @@ under the License.
 


-   0.10.0.1
+   0.10.1.0
--- End diff --

The dependency tree of 0.10.0.1 and 0.10.1.0 is the same when I use mvn 
dependency:tree to print the dependency information:
+- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile
|  +- net.jpountz.lz4:lz4:jar:1.3.0:compile
|  \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile

+- org.apache.kafka:kafka-clients:jar:0.10.1.0:compile
|  +- net.jpountz.lz4:lz4:jar:1.3.0:compile
|  \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-17 Thread zjureel
Github user zjureel commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117162289
  
--- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
@@ -37,7 +37,7 @@ under the License.
 


-   0.10.0.1
+   0.10.1.0
--- End diff --

The dependency tree of 0.10.0.1 and 0.10.1.0 is the same when I use mvn 
dependency:tree to print the dependency information:
+- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile
|  +- net.jpountz.lz4:lz4:jar:1.3.0:compile
|  \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile

+- org.apache.kafka:kafka-clients:jar:0.10.1.0:compile
|  +- net.jpountz.lz4:lz4:jar:1.3.0:compile
|  \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015216#comment-16015216
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
@tzulitai Glad to hear from you. In fact I'm also entangled with whether to 
put the `setStartFromSpecificDate` method into `FlinkKafkaConsumerBase`, and I 
put it into `FlinkKafkaComsumerBase` finally for two reasons:

1. All the other methods that set the Kafka start offset are in 
`FlinkKafkaConsumerBase`, to keep it aligned, I put `setStartFromSpecificDate` 
in `FlinkKafkaComsumerBase`
2. For subsequent versions of Kafka, such as version 0.11, this feature 
should be available also, but it may need to extend from the 
`FlinkKafkaConsumerBase` directly. I think this method will be used in multiple 
implements, so I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase`


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

2017-05-17 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
@tzulitai Glad to hear from you. In fact I'm also entangled with whether to 
put the `setStartFromSpecificDate` method into `FlinkKafkaConsumerBase`, and I 
put it into `FlinkKafkaComsumerBase` finally for two reasons:

1. All the other methods that set the Kafka start offset are in 
`FlinkKafkaConsumerBase`, to keep it aligned, I put `setStartFromSpecificDate` 
in `FlinkKafkaComsumerBase`
2. For subsequent versions of Kafka, such as version 0.11, this feature 
should be available also, but it may need to extend from the 
`FlinkKafkaConsumerBase` directly. I think this method will be used in multiple 
implements, so I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6618) Fix GroupWindowStringExpressionTest testcase bug

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015213#comment-16015213
 ] 

ASF GitHub Bot commented on FLINK-6618:
---

GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/3936

[FLINK-6618][table] Fix GroupWindowStringExpressionTest test case bug

In this PR. I had fix the `GroupWindowStringExpressionTest` test case bug
- [x] General
  - The pull request references the related JIRA issue 
("[FLINK-6618][table] Fix GroupWindowStringExpressionTest testcase bug")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-6618-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3936.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3936


commit f331798000bbf8ce6316862545c761583e8b9eef
Author: sunjincheng121 
Date:   2017-05-18T05:02:24Z

[FLINK-6618][table] Fix GroupWindowStringExpressionTest testcase bug




> Fix GroupWindowStringExpressionTest testcase bug
> 
>
> Key: FLINK-6618
> URL: https://issues.apache.org/jira/browse/FLINK-6618
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> I find 2 bugs as follows:
> 1. {{GroupWindowStringExpressionTest}} testcase bug:
> {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, 
> resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", 
> resJava.logicalPlan, resScala.logicalPlan)}}
> 2. When i fix the bug above, we got anther bug:
> {code}
> java.lang.AssertionError: Logical Plans do not match 
> Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as 
> '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 
> 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) 
> as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, 
> WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 
> 'long, 
> 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER 
> int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
> Actual   :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as 
> '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 
> 1440.millis, 
> 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, 
> sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, 
> WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 
> 'int, 'long, 
> 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER 
> int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
>  {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3936: [FLINK-6618][table] Fix GroupWindowStringExpressio...

2017-05-17 Thread sunjincheng121
GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/3936

[FLINK-6618][table] Fix GroupWindowStringExpressionTest test case bug

In this PR. I had fix the `GroupWindowStringExpressionTest` test case bug
- [x] General
  - The pull request references the related JIRA issue 
("[FLINK-6618][table] Fix GroupWindowStringExpressionTest testcase bug")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-6618-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3936.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3936


commit f331798000bbf8ce6316862545c761583e8b9eef
Author: sunjincheng121 
Date:   2017-05-18T05:02:24Z

[FLINK-6618][table] Fix GroupWindowStringExpressionTest testcase bug




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6618) Fix GroupWindowStringExpressionTest testcase bug

2017-05-17 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6618:
---
Summary: Fix GroupWindowStringExpressionTest testcase bug  (was: Fix 
`GroupWindow` JAVA logical plans not consistent with SCALA logical plans.)

> Fix GroupWindowStringExpressionTest testcase bug
> 
>
> Key: FLINK-6618
> URL: https://issues.apache.org/jira/browse/FLINK-6618
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> I find 2 bugs as follows:
> 1. {{GroupWindowStringExpressionTest}} testcase bug:
> {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, 
> resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", 
> resJava.logicalPlan, resScala.logicalPlan)}}
> 2. When i fix the bug above, we got anther bug:
> {code}
> java.lang.AssertionError: Logical Plans do not match 
> Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as 
> '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 
> 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) 
> as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, 
> WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 
> 'long, 
> 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER 
> int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
> Actual   :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as 
> '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 
> 1440.millis, 
> 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, 
> sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, 
> WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 
> 'int, 'long, 
> 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER 
> int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
>  {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6495) Migrate Akka configuration options

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015191#comment-16015191
 ] 

ASF GitHub Bot commented on FLINK-6495:
---

GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/3935

[FLINK-6495] Migrate Akka configuration options

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-6495

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3935.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3935


commit c87718694052e499875d78c7ef2bc9573dc0cc4e
Author: zjureel 
Date:   2017-05-18T04:34:40Z

[FLINK-6495] Migrate Akka configuration options




> Migrate Akka configuration options
> --
>
> Key: FLINK-6495
> URL: https://issues.apache.org/jira/browse/FLINK-6495
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3935: [FLINK-6495] Migrate Akka configuration options

2017-05-17 Thread zjureel
GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/3935

[FLINK-6495] Migrate Akka configuration options

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-6495

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3935.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3935


commit c87718694052e499875d78c7ef2bc9573dc0cc4e
Author: zjureel 
Date:   2017-05-18T04:34:40Z

[FLINK-6495] Migrate Akka configuration options




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6618) Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.

2017-05-17 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6618:
---
Issue Type: Sub-task  (was: Bug)
Parent: FLINK-6619

> Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.
> -
>
> Key: FLINK-6618
> URL: https://issues.apache.org/jira/browse/FLINK-6618
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> I find 2 bugs as follows:
> 1. {{GroupWindowStringExpressionTest}} testcase bug:
> {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, 
> resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", 
> resJava.logicalPlan, resScala.logicalPlan)}}
> 2. When i fix the bug above, we got anther bug:
> {code}
> java.lang.AssertionError: Logical Plans do not match 
> Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as 
> '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 
> 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) 
> as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, 
> WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 
> 'long, 
> 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER 
> int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
> Actual   :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as 
> '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 
> 1440.millis, 
> 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, 
> sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, 
> WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 
> 'int, 'long, 
> 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER 
> int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
>  {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6619) Check Table API & SQL support for 1.3.0 RC01 Release

2017-05-17 Thread sunjincheng (JIRA)
sunjincheng created FLINK-6619:
--

 Summary: Check Table API & SQL support for 1.3.0 RC01 Release
 Key: FLINK-6619
 URL: https://issues.apache.org/jira/browse/FLINK-6619
 Project: Flink
  Issue Type: Test
  Components: Table API & SQL
Affects Versions: 1.3.0
Reporter: sunjincheng
Assignee: sunjincheng


In this JIRA. I will do the following tasks for Flink 1.3.0 RC01 Release.
* Check that the JAVA and SCALA logical plans are consistent.
* Check that the SQL and Table API logical plans are consistent.
* Check that UDF, UDTF, and UDAF are working properly in group-windows and 
over-windows.
* Check that all built-in Agg on Batch and Stream are working properly.

When I do the task above, I'll created some sub-task.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6617) Improve JAVA and SCALA logical plans consistent test

2017-05-17 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6617:
---
Issue Type: Sub-task  (was: Test)
Parent: FLINK-6619

> Improve JAVA and SCALA logical plans consistent test
> 
>
> Key: FLINK-6617
> URL: https://issues.apache.org/jira/browse/FLINK-6617
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently,we need some `StringExpression` test,for all JAVA and SCALA API.
> Such as:`GroupAggregations`,`GroupWindowAggregaton`(Session,Tumble),`Calc` 
> etc.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6618) Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.

2017-05-17 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6618:
---
Description: 
I find 2 bugs as follows:

1. {{GroupWindowStringExpressionTest}} testcase bug:

{{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, 
resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", 
resJava.logicalPlan, resScala.logicalPlan)}}

2. When i fix the bug above, we got anther bug:

{code}
java.lang.AssertionError: Logical Plans do not match 
Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as 
'_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 
1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) as 
'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, 
WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 
'long, 
'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, 
BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
Actual   :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as 
'_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 
1440.millis, 
720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, 
sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, 
WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 
'int, 'long, 
'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, 
BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
 {code}


  was:
I find 2 bugs as follows:
1. {{GroupWindowStringExpressionTest}} testcase bug, 
   {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, 
resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", 
resJava.logicalPlan, resScala.logicalPlan)}}
2. When i fix the bug above, we got anther bug:
{code}
java.lang.AssertionError: Logical Plans do not match 
Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as 
'_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 
1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) as 
'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, 
WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 
'long, 
'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, 
BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
Actual   :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as 
'_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 
1440.millis, 
720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, 
sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, 
WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 
'int, 'long, 
'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, 
BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
 {code}



> Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.
> -
>
> Key: FLINK-6618
> URL: https://issues.apache.org/jira/browse/FLINK-6618
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> I find 2 bugs as follows:
> 1. {{GroupWindowStringExpressionTest}} testcase bug:
> {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, 
> resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", 
> resJava.logicalPlan, resScala.logicalPlan)}}
> 2. When i fix the bug above, we got anther bug:
> {code}
> java.lang.AssertionError: Logical Plans do not match 
> Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as 
> '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 
> 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) 
> as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, 
> WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 
> 'long, 
> 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER 
> int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
> Actual   :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as 
> '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 
> 1440.millis, 
> 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, 
> sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, 
> WeightedAvg(ArrayBuffer('int, 

[jira] [Updated] (FLINK-6618) Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.

2017-05-17 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6618:
---
Description: 
I find 2 bugs as follows:
1. {{GroupWindowStringExpressionTest}} testcase bug, 
   {{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, 
resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", 
resJava.logicalPlan, resScala.logicalPlan)}}
2. When i fix the bug above, we got anther bug:
{code}
java.lang.AssertionError: Logical Plans do not match 
Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as 
'_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 
1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) as 
'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, 
WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 
'long, 
'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, 
BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
Actual   :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as 
'_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 
1440.millis, 
720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, 
sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, 
WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 
'int, 'long, 
'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, 
BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
 {code}


  was:
I find 2 bugs as follows:
1. `GroupWindowStringExpressionTest` testcase bug, 
   `Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, 
resJava.logicalPlan)` -> `Assert.assertEquals("Logical Plans do not match", 
resJava.logicalPlan, resScala.logicalPlan)`
2. When i fix the bug above, we got anther bug:
{code}
java.lang.AssertionError: Logical Plans do not match 
Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as 
'_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 
1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) as 
'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, 
WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 
'long, 
'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, 
BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
Actual   :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as 
'_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 
1440.millis, 
720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, 
sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, 
WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 
'int, 'long, 
'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, 
BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
 {code}



> Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.
> -
>
> Key: FLINK-6618
> URL: https://issues.apache.org/jira/browse/FLINK-6618
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> I find 2 bugs as follows:
> 1. {{GroupWindowStringExpressionTest}} testcase bug, 
>{{Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, 
> resJava.logicalPlan)}} -> {{Assert.assertEquals("Logical Plans do not match", 
> resJava.logicalPlan, resScala.logicalPlan)}}
> 2. When i fix the bug above, we got anther bug:
> {code}
> java.lang.AssertionError: Logical Plans do not match 
> Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as 
> '_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 
> 1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) 
> as 'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, 
> WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 
> 'long, 
> 'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER 
> int, BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
> Actual   :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as 
> '_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 
> 1440.millis, 
> 720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, 
> sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, 
> WeightedAvg(ArrayBuffer('int, 'int)) 

[jira] [Created] (FLINK-6618) Fix `GroupWindow` JAVA logical plans not consistent with SCALA logical plans.

2017-05-17 Thread sunjincheng (JIRA)
sunjincheng created FLINK-6618:
--

 Summary: Fix `GroupWindow` JAVA logical plans not consistent with 
SCALA logical plans.
 Key: FLINK-6618
 URL: https://issues.apache.org/jira/browse/FLINK-6618
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.3.0
Reporter: sunjincheng
Assignee: sunjincheng


I find 2 bugs as follows:
1. `GroupWindowStringExpressionTest` testcase bug, 
   `Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, 
resJava.logicalPlan)` -> `Assert.assertEquals("Logical Plans do not match", 
resJava.logicalPlan, resScala.logicalPlan)`
2. When i fix the bug above, we got anther bug:
{code}
java.lang.AssertionError: Logical Plans do not match 
Expected :Project(ListBuffer('string, 'TMP_4, 'TMP_5, 'TMP_6, ('TMP_7 * 2) as 
'_c4),WindowAggregate(List('string),SlidingGroupWindow('w, 'rowtime, 
1440.millis, 720.millis),List(),List(CountAggFunction(List('string)) as 
'TMP_4, sum('int) as 'TMP_5, WeightedAvg(List('long, 'int)) as 'TMP_6, 
WeightedAvg(List('int, 'int)) as 'TMP_7),Project(ArrayBuffer('string, 'int, 
'long, 
'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, 
BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
Actual   :Project(ListBuffer('string, 'TMP_0, 'TMP_1, 'TMP_2, ('TMP_3 * 2) as 
'_c4),WindowAggregate(ArrayBuffer('string),SlidingGroupWindow('w, 'rowtime, 
1440.millis, 
720.millis),List(),List(CountAggFunction(ArrayBuffer('string)) as 'TMP_0, 
sum('int) as 'TMP_1, WeightedAvg(ArrayBuffer('long, 'int)) as 'TMP_2, 
WeightedAvg(ArrayBuffer('int, 'int)) as 'TMP_3),Project(ArrayBuffer('string, 
'int, 'long, 
'rowtime),CatalogNode(WrappedArray(_DataStreamTable_0),RecordType(INTEGER int, 
BIGINT long, VARCHAR(2147483647) string, TIMESTAMP(3) rowtime)
 {code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6617) Improve JAVA and SCALA logical plans consistent test

2017-05-17 Thread sunjincheng (JIRA)
sunjincheng created FLINK-6617:
--

 Summary: Improve JAVA and SCALA logical plans consistent test
 Key: FLINK-6617
 URL: https://issues.apache.org/jira/browse/FLINK-6617
 Project: Flink
  Issue Type: Test
  Components: Table API & SQL
Affects Versions: 1.3.0
Reporter: sunjincheng
Assignee: sunjincheng


Currently,we need some `StringExpression` test,for all JAVA and SCALA API.
Such as:`GroupAggregations`,`GroupWindowAggregaton`(Session,Tumble),`Calc` etc.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6608) Relax Kerberos login contexts parsing by trimming whitespaces in contexts list

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015062#comment-16015062
 ] 

ASF GitHub Bot commented on FLINK-6608:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3928#discussion_r117146767
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -230,10 +230,15 @@ private void validate() {
}
 
private static List parseList(String value) {
-   if(value == null) {
+   if(value == null || value.isEmpty()) {
return Collections.emptyList();
}
-   return Arrays.asList(value.split(","));
+
+   return Arrays.asList(value
+   .replaceAll("\\s*,\\s*", ",") // remove 
whitespaces surrounding commas
+   .replaceAll(",,", ",") // remove empty entries
--- End diff --

Hi, @tzulitai `.replaceAll("\\s*,\\s*", ",").replaceAll(",,", ",") ` can 
not deal with `" a, b,,, c d, e "`
I think we can using the expression as follows:
 `str.trim().replaceAll("(\\s*,+\\s*)+", ",")` OR 
`str.replace("/^\\s+|\\s+$/g","").replaceAll("(\\s*,\\s*)+", ",");`



> Relax Kerberos login contexts parsing by trimming whitespaces in contexts list
> --
>
> Key: FLINK-6608
> URL: https://issues.apache.org/jira/browse/FLINK-6608
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Security
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Minor
> Fix For: 1.3.0, 1.4.0
>
>
> The Kerberos login contexts list parsing right now isn't quite user-friendly.
> The list must be provided as: {{security.kerberos.login.contexts: 
> Client,KafkaClient}}, without any whitespace in between.
> We can relax this to be more user-friendly by trimming any whitespaces in the 
> list.
> A user actually stumbled across this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problems-with-Kerberos-Kafka-connection-in-version-1-2-0-td12580.html#a12589



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3928: [FLINK-6608] [security, config] Relax Kerberos log...

2017-05-17 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3928#discussion_r117146767
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -230,10 +230,15 @@ private void validate() {
}
 
private static List parseList(String value) {
-   if(value == null) {
+   if(value == null || value.isEmpty()) {
return Collections.emptyList();
}
-   return Arrays.asList(value.split(","));
+
+   return Arrays.asList(value
+   .replaceAll("\\s*,\\s*", ",") // remove 
whitespaces surrounding commas
+   .replaceAll(",,", ",") // remove empty entries
--- End diff --

Hi, @tzulitai `.replaceAll("\\s*,\\s*", ",").replaceAll(",,", ",") ` can 
not deal with `" a, b,,, c d, e "`
I think we can using the expression as follows:
 `str.trim().replaceAll("(\\s*,+\\s*)+", ",")` OR 
`str.replace("/^\\s+|\\s+$/g","").replaceAll("(\\s*,\\s*)+", ",");`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015057#comment-16015057
 ] 

ASF GitHub Bot commented on FLINK-6075:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3889#discussion_r117146377
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, ListTypeInfo}
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.configuration.Configuration
+import java.util.Comparator
+import java.util.ArrayList
+import java.util.Collections
+import org.apache.flink.api.common.typeutils.TypeComparator
+import java.util.{List => JList, ArrayList => JArrayList}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+/**
+ * Process Function used for the aggregate in bounded rowtime sort without 
offset/fetch
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param fieldCount Is used to indicate fields in the current element to 
forward
+ * @param inputType It is used to mark the type of the incoming data
+ * @param rowComparator the [[java.util.Comparator]] is used for this sort 
aggregation
+ */
+class RowTimeSortProcessFunction(
+  private val fieldCount: Int,
+  private val inputRowType: CRowTypeInfo,
+  private val rowComparator: CollectionRowComparator)
+extends ProcessFunction[CRow, CRow] {
+
+  Preconditions.checkNotNull(rowComparator)
+
+  private val sortArray: ArrayList[Row] = new ArrayList[Row]
+  
+  // the state which keeps all the events that are not expired.
+  // Each timestamp will contain an associated list with the events 
+  // received at that timestamp
+  private var dataState: MapState[Long, JList[Row]] = _
+
+// the state which keeps the last triggering timestamp to filter late 
events
+  private var lastTriggeringTsState: ValueState[Long] = _
+  
+  private var outputC: CRow = _
+  
+  
+  override def open(config: Configuration) {
+ 
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](
+inputRowType.asInstanceOf[CRowTypeInfo].rowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]](
+"dataState",
+keyTypeInformation,
+valueTypeInformation)
+
+dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+  }
+
+  
+  override def processElement(
+inputC: CRow,
+ctx: ProcessFunction[CRow, 

[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-05-17 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3889#discussion_r117146377
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, ListTypeInfo}
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.configuration.Configuration
+import java.util.Comparator
+import java.util.ArrayList
+import java.util.Collections
+import org.apache.flink.api.common.typeutils.TypeComparator
+import java.util.{List => JList, ArrayList => JArrayList}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+/**
+ * Process Function used for the aggregate in bounded rowtime sort without 
offset/fetch
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param fieldCount Is used to indicate fields in the current element to 
forward
+ * @param inputType It is used to mark the type of the incoming data
+ * @param rowComparator the [[java.util.Comparator]] is used for this sort 
aggregation
+ */
+class RowTimeSortProcessFunction(
+  private val fieldCount: Int,
+  private val inputRowType: CRowTypeInfo,
+  private val rowComparator: CollectionRowComparator)
+extends ProcessFunction[CRow, CRow] {
+
+  Preconditions.checkNotNull(rowComparator)
+
+  private val sortArray: ArrayList[Row] = new ArrayList[Row]
+  
+  // the state which keeps all the events that are not expired.
+  // Each timestamp will contain an associated list with the events 
+  // received at that timestamp
+  private var dataState: MapState[Long, JList[Row]] = _
+
+// the state which keeps the last triggering timestamp to filter late 
events
+  private var lastTriggeringTsState: ValueState[Long] = _
+  
+  private var outputC: CRow = _
+  
+  
+  override def open(config: Configuration) {
+ 
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](
+inputRowType.asInstanceOf[CRowTypeInfo].rowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]](
+"dataState",
+keyTypeInformation,
+valueTypeInformation)
+
+dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+  }
+
+  
+  override def processElement(
+inputC: CRow,
+ctx: ProcessFunction[CRow, CRow]#Context,
+out: Collector[CRow]): Unit = {
+
+ val input = inputC.row
+
+ if( outputC == null) {
+  outputC = new CRow(input, true)
+}
+
+// triggering timestamp for trigger 

[jira] [Commented] (FLINK-6608) Relax Kerberos login contexts parsing by trimming whitespaces in contexts list

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015027#comment-16015027
 ] 

ASF GitHub Bot commented on FLINK-6608:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/3928#discussion_r117144044
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -230,10 +231,21 @@ private void validate() {
}
 
private static List parseList(String value) {
-   if(value == null) {
+   if(value == null || value.isEmpty()) {
return Collections.emptyList();
}
-   return Arrays.asList(value.split(","));
--- End diff --

The wisdom of that paragraph has been debated extensively on stackoverflow. 
 I'll just say that the `StringTokenizer` is not deprecated and, in my opinion, 
fits this scenario uniquely well.   Go ahead either way.


> Relax Kerberos login contexts parsing by trimming whitespaces in contexts list
> --
>
> Key: FLINK-6608
> URL: https://issues.apache.org/jira/browse/FLINK-6608
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Security
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Minor
> Fix For: 1.3.0, 1.4.0
>
>
> The Kerberos login contexts list parsing right now isn't quite user-friendly.
> The list must be provided as: {{security.kerberos.login.contexts: 
> Client,KafkaClient}}, without any whitespace in between.
> We can relax this to be more user-friendly by trimming any whitespaces in the 
> list.
> A user actually stumbled across this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problems-with-Kerberos-Kafka-connection-in-version-1-2-0-td12580.html#a12589



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3928: [FLINK-6608] [security, config] Relax Kerberos log...

2017-05-17 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/3928#discussion_r117144044
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -230,10 +231,21 @@ private void validate() {
}
 
private static List parseList(String value) {
-   if(value == null) {
+   if(value == null || value.isEmpty()) {
return Collections.emptyList();
}
-   return Arrays.asList(value.split(","));
--- End diff --

The wisdom of that paragraph has been debated extensively on stackoverflow. 
 I'll just say that the `StringTokenizer` is not deprecated and, in my opinion, 
fits this scenario uniquely well.   Go ahead either way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6608) Relax Kerberos login contexts parsing by trimming whitespaces in contexts list

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015010#comment-16015010
 ] 

ASF GitHub Bot commented on FLINK-6608:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3928#discussion_r117142993
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -230,10 +231,21 @@ private void validate() {
}
 
private static List parseList(String value) {
-   if(value == null) {
+   if(value == null || value.isEmpty()) {
return Collections.emptyList();
}
-   return Arrays.asList(value.split(","));
--- End diff --

@EronWright your suggestion will be work. But I'd like using regular 
expression. The JDK DOC also has the same recommend:
 `StringTokenizer is a legacy class that is retained for compatibility 
reasons although its use is discouraged in new code. It is recommended that 
anyone seeking this functionality use the split method of String or the 
java.util.regex package instead.`
Please see: 
http://docs.oracle.com/javase/7/docs/api/java/util/StringTokenizer.html
What do you think ? @EronWright @tzulitai 
Best,
SunJincheng



> Relax Kerberos login contexts parsing by trimming whitespaces in contexts list
> --
>
> Key: FLINK-6608
> URL: https://issues.apache.org/jira/browse/FLINK-6608
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Security
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Minor
> Fix For: 1.3.0, 1.4.0
>
>
> The Kerberos login contexts list parsing right now isn't quite user-friendly.
> The list must be provided as: {{security.kerberos.login.contexts: 
> Client,KafkaClient}}, without any whitespace in between.
> We can relax this to be more user-friendly by trimming any whitespaces in the 
> list.
> A user actually stumbled across this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problems-with-Kerberos-Kafka-connection-in-version-1-2-0-td12580.html#a12589



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3928: [FLINK-6608] [security, config] Relax Kerberos log...

2017-05-17 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3928#discussion_r117142993
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -230,10 +231,21 @@ private void validate() {
}
 
private static List parseList(String value) {
-   if(value == null) {
+   if(value == null || value.isEmpty()) {
return Collections.emptyList();
}
-   return Arrays.asList(value.split(","));
--- End diff --

@EronWright your suggestion will be work. But I'd like using regular 
expression. The JDK DOC also has the same recommend:
 `StringTokenizer is a legacy class that is retained for compatibility 
reasons although its use is discouraged in new code. It is recommended that 
anyone seeking this functionality use the split method of String or the 
java.util.regex package instead.`
Please see: 
http://docs.oracle.com/javase/7/docs/api/java/util/StringTokenizer.html
What do you think ? @EronWright @tzulitai 
Best,
SunJincheng



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6065) Make TransportClient for ES5 pluggable

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014957#comment-16014957
 ] 

ASF GitHub Bot commented on FLINK-6065:
---

Github user sschaef commented on the issue:

https://github.com/apache/flink/pull/3934
  
The Java7 run failed because my solution uses Java8 features. I guess I 
have to find another way to fix this issue then.


> Make TransportClient for ES5 pluggable
> --
>
> Key: FLINK-6065
> URL: https://issues.apache.org/jira/browse/FLINK-6065
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector, Streaming Connectors
>Reporter: Robert Metzger
>
> This JIRA is based on a user request: 
> http://stackoverflow.com/questions/42807454/flink-xpack-elasticsearch-5-elasticsearchsecurityexception-missing-autentication?noredirect=1#comment72728053_42807454
> Currently, in the {{Elasticsearch5ApiCallBridge}} the 
> {{PreBuiltTransportClient}} is hardcoded. It would be nice to make this 
> client pluggable to allow using other clients such as the 
> {{PreBuiltXPackTransportClient}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3934: [FLINK-6065] Add initClient method to ElasticsearchApiCal...

2017-05-17 Thread sschaef
Github user sschaef commented on the issue:

https://github.com/apache/flink/pull/3934
  
The Java7 run failed because my solution uses Java8 features. I guess I 
have to find another way to fix this issue then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6578) SharedBuffer creates self-loops when having elements with same value/timestamp.

2017-05-17 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014903#comment-16014903
 ] 

Ted Yu commented on FLINK-6578:
---

SharedBuffer.java isn't shown in the PR, so allow me to comment here.
{code}
public int hashCode() {
  return (int) (31 * (timestamp ^ timestamp >>> 32) + 31 * 
value.hashCode()) + counter;
{code}
Multiplier of 31 is applied to both (timestamp ^ timestamp >>> 32) and 
value.hashCode().
The following is probably the right expression:
{code}
  return (int) 31 * (31 * (timestamp ^ timestamp >>> 32) + 
value.hashCode()) + counter;
{code}

> SharedBuffer creates self-loops when having elements with same 
> value/timestamp.
> ---
>
> Key: FLINK-6578
> URL: https://issues.apache.org/jira/browse/FLINK-6578
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.3.0
>
>
> This is a test that fails with the current implementation due to the fact 
> that the looping state accepts the two {{middleEvent1}} elements but the 
> shared buffer cannot distinguish between them and gets trapped in an infinite 
> loop leading to running out of memory.
> {code}
> @Test
>   public void testEagerZeroOrMoreSameElement() {
>   List inputEvents = new ArrayList<>();
>   Event startEvent = new Event(40, "c", 1.0);
>   Event middleEvent1 = new Event(41, "a", 2.0);
>   Event middleEvent2 = new Event(42, "a", 3.0);
>   Event middleEvent3 = new Event(43, "a", 4.0);
>   Event end1 = new Event(44, "b", 5.0);
>   inputEvents.add(new StreamRecord<>(startEvent, 1));
>   inputEvents.add(new StreamRecord<>(middleEvent1, 3));
>   inputEvents.add(new StreamRecord<>(middleEvent1, 3));
>   inputEvents.add(new StreamRecord<>(middleEvent1, 3));
>   inputEvents.add(new StreamRecord<>(middleEvent2, 4));
>   inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
>   inputEvents.add(new StreamRecord<>(middleEvent3, 6));
>   inputEvents.add(new StreamRecord<>(middleEvent3, 6));
>   inputEvents.add(new StreamRecord<>(end1, 7));
>   Pattern pattern = 
> Pattern.begin("start").where(new SimpleCondition() {
>   private static final long serialVersionUID = 
> 5726188262756267490L;
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("c");
>   }
>   }).followedBy("middle").where(new SimpleCondition() {
>   private static final long serialVersionUID = 
> 5726188262756267490L;
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("a");
>   }
>   }).oneOrMore().optional().followedBy("end1").where(new 
> SimpleCondition() {
>   private static final long serialVersionUID = 
> 5726188262756267490L;
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("b");
>   }
>   });
>   NFA nfa = NFACompiler.compile(pattern, 
> Event.createTypeSerializer(), false);
>   final List resultingPatterns = 
> feedNFA(inputEvents, nfa);
>   compareMaps(resultingPatterns, Lists.newArrayList(
>   Lists.newArrayList(startEvent, middleEvent1, 
> middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
>   Lists.newArrayList(startEvent, middleEvent1, 
> middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
>   Lists.newArrayList(startEvent, middleEvent1, 
> middleEvent1, middleEvent1, middleEvent2, end1),
>   Lists.newArrayList(startEvent, middleEvent1, 
> middleEvent1, middleEvent1, end1),
>   Lists.newArrayList(startEvent, middleEvent1, 
> middleEvent1, end1),
>   Lists.newArrayList(startEvent, middleEvent1, 
> end1),
>   Lists.newArrayList(startEvent, end1)
>   ));
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6604) Remove Java Serialization from the CEP library.

2017-05-17 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014894#comment-16014894
 ] 

Ted Yu commented on FLINK-6604:
---

I was talking about DeweyNumberSerializer in the above comment.

> Remove Java Serialization from the CEP library.
> ---
>
> Key: FLINK-6604
> URL: https://issues.apache.org/jira/browse/FLINK-6604
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6604) Remove Java Serialization from the CEP library.

2017-05-17 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014893#comment-16014893
 ] 

Ted Yu commented on FLINK-6604:
---

equals() only checks the class.
I think we'd better align with hashCode() to be consistent.

> Remove Java Serialization from the CEP library.
> ---
>
> Key: FLINK-6604
> URL: https://issues.apache.org/jira/browse/FLINK-6604
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6583) Enable QueryConfig in count base GroupWindow

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014847#comment-16014847
 ] 

ASF GitHub Bot commented on FLINK-6583:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r117122323
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate(
   "non-windowed GroupBy aggregation.")
 }
 
+val isCountWindow = window match {
+  case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => 
true
+  case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => 
true
+  case _ => false
+}
+
+if (isCountWindow && grouping.length > 0 && 
queryConfig.getMinIdleStateRetentionTime < 0) {
+  LOG.warn(
--- End diff --

Should this be error() ?


> Enable QueryConfig in count base GroupWindow
> 
>
> Key: FLINK-6583
> URL: https://issues.apache.org/jira/browse/FLINK-6583
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0, 1.4.0
>
>
> Enable QueryConfig in count base GroupWindow by Add a custom Trigger 
> `CountTriggerWithCleanupState`. See more in FLINK-6491.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-17 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r117122323
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate(
   "non-windowed GroupBy aggregation.")
 }
 
+val isCountWindow = window match {
+  case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => 
true
+  case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => 
true
+  case _ => false
+}
+
+if (isCountWindow && grouping.length > 0 && 
queryConfig.getMinIdleStateRetentionTime < 0) {
+  LOG.warn(
--- End diff --

Should this be error() ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6583) Enable QueryConfig in count base GroupWindow

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014839#comment-16014839
 ] 

ASF GitHub Bot commented on FLINK-6583:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r117121382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate {
 case SlidingGroupWindow(_, timeField, size, slide)
 if isProctimeAttribute(timeField) && isRowCountLiteral(size) =>
   stream.countWindow(toLong(size), toLong(slide))
+  .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide)));
--- End diff --

Should this be toLong(size) ?


> Enable QueryConfig in count base GroupWindow
> 
>
> Key: FLINK-6583
> URL: https://issues.apache.org/jira/browse/FLINK-6583
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0, 1.4.0
>
>
> Enable QueryConfig in count base GroupWindow by Add a custom Trigger 
> `CountTriggerWithCleanupState`. See more in FLINK-6491.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-17 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r117121382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate {
 case SlidingGroupWindow(_, timeField, size, slide)
 if isProctimeAttribute(timeField) && isRowCountLiteral(size) =>
   stream.countWindow(toLong(size), toLong(slide))
+  .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide)));
--- End diff --

Should this be toLong(size) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6065) Make TransportClient for ES5 pluggable

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014817#comment-16014817
 ] 

ASF GitHub Bot commented on FLINK-6065:
---

GitHub user sschaef opened a pull request:

https://github.com/apache/flink/pull/3934

[FLINK-6065] Add initClient method to ElasticsearchApiCallBridge

This adds the method `initClient` to `ElasticsearchApiCallBridge` in
order to resolve FLINK-6065. This new method takes as argument a
function that can create a `TransportClient`. This is required in order
to not hardcode the `TransportClient` in the implementation of
`createClient`. `createClient` continues to exist in order to allow
backwards compatibility.

No tests provided because I couldn't find any existing test class that
tests the implementation of `ElasticsearchApiCallBridge`.



This is my first contribution, I haven't signed the ICLA yet (which I will 
do next). I didn't provide any tests yet because I had no idea how the 
functionality should be tested or where a test class should be added. If you 
would like to see tests for the changes, please tell me.

The change fixes the ticket but maybe you would like to see a different way 
on how to fix the issue.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sschaef/flink flink-6065

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3934.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3934


commit 2815b0c9ca407838611ba60b9c35a0ac550005e3
Author: Simon Schäfer 
Date:   2017-05-17T20:24:14Z

[FLINK-6065] Add initClient method to ElasticsearchApiCallBridge

This adds the method `initClient` to `ElasticsearchApiCallBridge` in
order to resolve FLINK-6065. This new method takes as argument a
function that can create a `TransportClient`. This is required in order
to not hardcode the `TransportClient` in the implementation of
`createClient`. `createClient` continues to exist in order to allow
backwards compatibility.

No tests provided because I couldn't find any existing test class that
tests the implementation of `ElasticsearchApiCallBridge`.




> Make TransportClient for ES5 pluggable
> --
>
> Key: FLINK-6065
> URL: https://issues.apache.org/jira/browse/FLINK-6065
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector, Streaming Connectors
>Reporter: Robert Metzger
>
> This JIRA is based on a user request: 
> http://stackoverflow.com/questions/42807454/flink-xpack-elasticsearch-5-elasticsearchsecurityexception-missing-autentication?noredirect=1#comment72728053_42807454
> Currently, in the {{Elasticsearch5ApiCallBridge}} the 
> {{PreBuiltTransportClient}} is hardcoded. It would be nice to make this 
> client pluggable to allow using other clients such as the 
> {{PreBuiltXPackTransportClient}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3934: [FLINK-6065] Add initClient method to Elasticsearc...

2017-05-17 Thread sschaef
GitHub user sschaef opened a pull request:

https://github.com/apache/flink/pull/3934

[FLINK-6065] Add initClient method to ElasticsearchApiCallBridge

This adds the method `initClient` to `ElasticsearchApiCallBridge` in
order to resolve FLINK-6065. This new method takes as argument a
function that can create a `TransportClient`. This is required in order
to not hardcode the `TransportClient` in the implementation of
`createClient`. `createClient` continues to exist in order to allow
backwards compatibility.

No tests provided because I couldn't find any existing test class that
tests the implementation of `ElasticsearchApiCallBridge`.



This is my first contribution, I haven't signed the ICLA yet (which I will 
do next). I didn't provide any tests yet because I had no idea how the 
functionality should be tested or where a test class should be added. If you 
would like to see tests for the changes, please tell me.

The change fixes the ticket but maybe you would like to see a different way 
on how to fix the issue.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sschaef/flink flink-6065

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3934.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3934


commit 2815b0c9ca407838611ba60b9c35a0ac550005e3
Author: Simon Schäfer 
Date:   2017-05-17T20:24:14Z

[FLINK-6065] Add initClient method to ElasticsearchApiCallBridge

This adds the method `initClient` to `ElasticsearchApiCallBridge` in
order to resolve FLINK-6065. This new method takes as argument a
function that can create a `TransportClient`. This is required in order
to not hardcode the `TransportClient` in the implementation of
`createClient`. `createClient` continues to exist in order to allow
backwards compatibility.

No tests provided because I couldn't find any existing test class that
tests the implementation of `ElasticsearchApiCallBridge`.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-17 Thread Andrey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014647#comment-16014647
 ] 

Andrey commented on FLINK-6613:
---

Hi Dmytro.

There are several problems still exist:
1) I want to fail fast and kill jvm if it spends too much time doing gc. So we 
just can't remove "UseGCOverheadLimit" option.
2) Even if I remove this option, root cause will still exist. And the root 
cause is: kafka integration incorrectly deals with large messages. It reads as 
many messages as it could from kafka. And that will lead to OOM. GC settings 
irrelevant here, because these messages should not and will not be eligible for 
GC.
3) If you recommend G1, then default startup scripts should be changed.

Have you tried to reproduce issue?

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6608) Relax Kerberos login contexts parsing by trimming whitespaces in contexts list

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014506#comment-16014506
 ] 

ASF GitHub Bot commented on FLINK-6608:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/3928#discussion_r117068349
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -230,10 +231,21 @@ private void validate() {
}
 
private static List parseList(String value) {
-   if(value == null) {
+   if(value == null || value.isEmpty()) {
return Collections.emptyList();
}
-   return Arrays.asList(value.split(","));
--- End diff --

An alternative to consider:
`Collections.list(new StringTokenizer(",X, ,,Y , Z,", ", "))` produces `[X, 
Y, Z]`.



> Relax Kerberos login contexts parsing by trimming whitespaces in contexts list
> --
>
> Key: FLINK-6608
> URL: https://issues.apache.org/jira/browse/FLINK-6608
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Security
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Minor
> Fix For: 1.3.0, 1.4.0
>
>
> The Kerberos login contexts list parsing right now isn't quite user-friendly.
> The list must be provided as: {{security.kerberos.login.contexts: 
> Client,KafkaClient}}, without any whitespace in between.
> We can relax this to be more user-friendly by trimming any whitespaces in the 
> list.
> A user actually stumbled across this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problems-with-Kerberos-Kafka-connection-in-version-1-2-0-td12580.html#a12589



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3928: [FLINK-6608] [security, config] Relax Kerberos log...

2017-05-17 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/3928#discussion_r117068349
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -230,10 +231,21 @@ private void validate() {
}
 
private static List parseList(String value) {
-   if(value == null) {
+   if(value == null || value.isEmpty()) {
return Collections.emptyList();
}
-   return Arrays.asList(value.split(","));
--- End diff --

An alternative to consider:
`Collections.list(new StringTokenizer(",X, ,,Y , Z,", ", "))` produces `[X, 
Y, Z]`.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014378#comment-16014378
 ] 

ASF GitHub Bot commented on FLINK-6606:
---

GitHub user EronWright opened a pull request:

https://github.com/apache/flink/pull/3933

[FLINK-6606] Create checkpoint hook with user classloader

- wrap calls to MasterTriggerRestoreHook (and its factory) such that the 
user classloader is applied


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EronWright/flink FLINK-6606

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3933.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3933


commit fbf904a60a1e252944e1cbc7ad60c5d95ae28ec2
Author: Wright, Eron 
Date:   2017-05-17T16:46:13Z

FLINK-6606
- wrap calls to MasterTriggerRestoreHook (and its factory) such that the 
user classloader is applied




> Create checkpoint hook with user classloader
> 
>
> Key: FLINK-6606
> URL: https://issues.apache.org/jira/browse/FLINK-6606
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Flink should set the thread's classloader when calling the checkpoint hook 
> factory's `create` method.   Without that, the hook is likely to fail during 
> initialization (e.g. using ServiceLoader). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3933: [FLINK-6606] Create checkpoint hook with user clas...

2017-05-17 Thread EronWright
GitHub user EronWright opened a pull request:

https://github.com/apache/flink/pull/3933

[FLINK-6606] Create checkpoint hook with user classloader

- wrap calls to MasterTriggerRestoreHook (and its factory) such that the 
user classloader is applied


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EronWright/flink FLINK-6606

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3933.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3933


commit fbf904a60a1e252944e1cbc7ad60c5d95ae28ec2
Author: Wright, Eron 
Date:   2017-05-17T16:46:13Z

FLINK-6606
- wrap calls to MasterTriggerRestoreHook (and its factory) such that the 
user classloader is applied




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-17 Thread Dmytro Shkvyra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014357#comment-16014357
 ] 

Dmytro Shkvyra commented on FLINK-6613:
---

Hi [~dernasherbrezon], I have assumed that you say that you using ParallelGC.
Please see 
https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/parallel.html
 
{quote}
The parallel collector throws an OutOfMemoryError if too much time is being 
spent in garbage collection (GC): If more than 98% of the total time is spent 
in garbage collection and less than 2% of the heap is recovered, then an 
OutOfMemoryError is thrown. This feature is designed to prevent applications 
from running for an extended period of time while making little or no progress 
because the heap is too small. If necessary, this feature can be disabled by 
adding the option -XX:-UseGCOverheadLimit to the command line.
{quote}
and if 
{quote}
3) Send 3300 messages each 635Kb. So total size is ~2G
{quote}
ParallelGC cant collect all garbage in time.
BTW, we have two parallel CG algorithms 
http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/gc01/index.html 
and old one clean old generation also.
I think we can close this ticket, because it could be solved by using GC1 and 
out of scope FLINK 

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3929: [FLINK-6543] [table] Deprecate toDataStream

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3929#discussion_r117045701
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 ---
@@ -144,13 +144,119 @@ class StreamTableEnvironment(
 * types: Fields are mapped by position, field types must match.
 * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
 *
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  @Deprecated
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = 
toAppendStream(table, clazz)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete changes, the conversion will fail.
+*
+* The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+* - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+* types: Fields are mapped by position, field types must match.
+* - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+*
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param typeInfo The [[TypeInformation]] that specifies the type of 
the [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): 
DataStream[T] =
+toAppendStream(table, typeInfo)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete changes, the conversion will fail.
+*
+* The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+* - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+* types: Fields are mapped by position, field types must match.
+* - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+*
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @param queryConfig The configuration of the query to generate.
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  def toDataStream[T](
+  table: Table,
+  clazz: Class[T],
+  queryConfig: StreamQueryConfig): DataStream[T] = 
toAppendStream(table, clazz, queryConfig)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete changes, the conversion will fail.
+*
+* The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+* - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+* types: 

[GitHub] flink pull request #3929: [FLINK-6543] [table] Deprecate toDataStream

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3929#discussion_r117045613
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 ---
@@ -144,13 +144,119 @@ class StreamTableEnvironment(
 * types: Fields are mapped by position, field types must match.
 * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
 *
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  @Deprecated
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = 
toAppendStream(table, clazz)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete changes, the conversion will fail.
+*
+* The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+* - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+* types: Fields are mapped by position, field types must match.
+* - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+*
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param typeInfo The [[TypeInformation]] that specifies the type of 
the [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): 
DataStream[T] =
--- End diff --

add `@Deprecated` annotation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3929: [FLINK-6543] [table] Deprecate toDataStream

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3929#discussion_r117050100
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 ---
@@ -144,13 +144,119 @@ class StreamTableEnvironment(
 * types: Fields are mapped by position, field types must match.
 * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
 *
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  @Deprecated
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = 
toAppendStream(table, clazz)
--- End diff --

This method is still referenced from the `java` `package-info.java` file 
and the `java` `SqlITCase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6543) Deprecate toDataStream

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014352#comment-16014352
 ] 

ASF GitHub Bot commented on FLINK-6543:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3929#discussion_r117045613
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 ---
@@ -144,13 +144,119 @@ class StreamTableEnvironment(
 * types: Fields are mapped by position, field types must match.
 * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
 *
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  @Deprecated
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = 
toAppendStream(table, clazz)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete changes, the conversion will fail.
+*
+* The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+* - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+* types: Fields are mapped by position, field types must match.
+* - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+*
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param typeInfo The [[TypeInformation]] that specifies the type of 
the [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): 
DataStream[T] =
--- End diff --

add `@Deprecated` annotation


> Deprecate toDataStream
> --
>
> Key: FLINK-6543
> URL: https://issues.apache.org/jira/browse/FLINK-6543
> Project: Flink
>  Issue Type: Task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.3.0
>
>
> With retraction support, we should deprecate {{toDataStream}} and introduce a 
> new {{toAppendStream}} to clearly differentiate between retraction and 
> non-retraction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3929: [FLINK-6543] [table] Deprecate toDataStream

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3929#discussion_r117045675
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 ---
@@ -144,13 +144,119 @@ class StreamTableEnvironment(
 * types: Fields are mapped by position, field types must match.
 * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
 *
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  @Deprecated
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = 
toAppendStream(table, clazz)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete changes, the conversion will fail.
+*
+* The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+* - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+* types: Fields are mapped by position, field types must match.
+* - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+*
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param typeInfo The [[TypeInformation]] that specifies the type of 
the [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): 
DataStream[T] =
+toAppendStream(table, typeInfo)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete changes, the conversion will fail.
+*
+* The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+* - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+* types: Fields are mapped by position, field types must match.
+* - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+*
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @param queryConfig The configuration of the query to generate.
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  def toDataStream[T](
--- End diff --

add `@Deprecated` annotation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6543) Deprecate toDataStream

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014353#comment-16014353
 ] 

ASF GitHub Bot commented on FLINK-6543:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3929#discussion_r117050100
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 ---
@@ -144,13 +144,119 @@ class StreamTableEnvironment(
 * types: Fields are mapped by position, field types must match.
 * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
 *
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  @Deprecated
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = 
toAppendStream(table, clazz)
--- End diff --

This method is still referenced from the `java` `package-info.java` file 
and the `java` `SqlITCase`.


> Deprecate toDataStream
> --
>
> Key: FLINK-6543
> URL: https://issues.apache.org/jira/browse/FLINK-6543
> Project: Flink
>  Issue Type: Task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.3.0
>
>
> With retraction support, we should deprecate {{toDataStream}} and introduce a 
> new {{toAppendStream}} to clearly differentiate between retraction and 
> non-retraction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6543) Deprecate toDataStream

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014351#comment-16014351
 ] 

ASF GitHub Bot commented on FLINK-6543:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3929#discussion_r117045701
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 ---
@@ -144,13 +144,119 @@ class StreamTableEnvironment(
 * types: Fields are mapped by position, field types must match.
 * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
 *
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  @Deprecated
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = 
toAppendStream(table, clazz)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete changes, the conversion will fail.
+*
+* The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+* - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+* types: Fields are mapped by position, field types must match.
+* - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+*
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param typeInfo The [[TypeInformation]] that specifies the type of 
the [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): 
DataStream[T] =
+toAppendStream(table, typeInfo)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete changes, the conversion will fail.
+*
+* The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+* - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+* types: Fields are mapped by position, field types must match.
+* - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+*
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @param queryConfig The configuration of the query to generate.
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  def toDataStream[T](
+  table: Table,
+  clazz: Class[T],
+  queryConfig: StreamQueryConfig): DataStream[T] = 
toAppendStream(table, clazz, queryConfig)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete 

[jira] [Commented] (FLINK-6543) Deprecate toDataStream

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014354#comment-16014354
 ] 

ASF GitHub Bot commented on FLINK-6543:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3929#discussion_r117045675
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 ---
@@ -144,13 +144,119 @@ class StreamTableEnvironment(
 * types: Fields are mapped by position, field types must match.
 * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
 *
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  @Deprecated
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = 
toAppendStream(table, clazz)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete changes, the conversion will fail.
+*
+* The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+* - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+* types: Fields are mapped by position, field types must match.
+* - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+*
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param typeInfo The [[TypeInformation]] that specifies the type of 
the [[DataStream]].
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): 
DataStream[T] =
+toAppendStream(table, typeInfo)
+
+  /**
+* Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+*
+* The [[Table]] must only have insert (append) changes. If the 
[[Table]] is also modified
+* by update or delete changes, the conversion will fail.
+*
+* The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+* - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+* types: Fields are mapped by position, field types must match.
+* - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+*
+* NOTE: This method only supports conversion of append-only tables. In 
order to make this
+* more explicit in the future, please use [[toAppendStream()]] instead.
+* If add and retract messages are required, use [[toRetractStream()]].
+*
+* @param table The [[Table]] to convert.
+* @param clazz The class of the type of the resulting [[DataStream]].
+* @param queryConfig The configuration of the query to generate.
+* @tparam T The type of the resulting [[DataStream]].
+* @return The converted [[DataStream]].
+* @deprecated This method only supports conversion of append-only 
tables. In order to
+*make this more explicit in the future, please use 
toAppendStream() instead.
+*/
+  def toDataStream[T](
--- End diff --

add `@Deprecated` annotation


> Deprecate toDataStream
> --
>
> Key: FLINK-6543
> URL: https://issues.apache.org/jira/browse/FLINK-6543
> Project: Flink
>  Issue Type: Task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 

[jira] [Commented] (FLINK-6616) Clarify provenance of official Docker images

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014348#comment-16014348
 ] 

ASF GitHub Bot commented on FLINK-6616:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/3932

[FLINK-6616] [docs] Clarify provenance of official Docker images

Note that the official Docker images for Flink are community supported and 
not an official release of the Apache Flink PMC.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
6616_clarify_provenance_of_official_docker_images

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3932.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3932


commit 26914aa7f49d5e458ded35926aa944699821db5c
Author: Greg Hogan 
Date:   2017-05-17T16:27:34Z

[FLINK-6616] [docs] Clarify provenance of official Docker images

Note that the official Docker images for Flink are community supported
and not an official release of the Apache Flink PMC.




> Clarify provenance of official Docker images
> 
>
> Key: FLINK-6616
> URL: https://issues.apache.org/jira/browse/FLINK-6616
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Critical
> Fix For: 1.3.0
>
>
> Note that the official Docker images for Flink are community supported and 
> not an official release of the Apache Flink PMC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3932: [FLINK-6616] [docs] Clarify provenance of official...

2017-05-17 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/3932

[FLINK-6616] [docs] Clarify provenance of official Docker images

Note that the official Docker images for Flink are community supported and 
not an official release of the Apache Flink PMC.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
6616_clarify_provenance_of_official_docker_images

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3932.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3932


commit 26914aa7f49d5e458ded35926aa944699821db5c
Author: Greg Hogan 
Date:   2017-05-17T16:27:34Z

[FLINK-6616] [docs] Clarify provenance of official Docker images

Note that the official Docker images for Flink are community supported
and not an official release of the Apache Flink PMC.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6616) Clarify provenance of official Docker images

2017-05-17 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-6616:
-

 Summary: Clarify provenance of official Docker images
 Key: FLINK-6616
 URL: https://issues.apache.org/jira/browse/FLINK-6616
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.3.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Critical
 Fix For: 1.3.0


Note that the official Docker images for Flink are community supported and not 
an official release of the Apache Flink PMC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6031) Add parameter for per job yarn clusters to control whether the user code jar is included into the system classloader.

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014338#comment-16014338
 ] 

ASF GitHub Bot commented on FLINK-6031:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3931

[FLINK-6031][yarn] Add config parameter for user-jar inclusion in cla…

This PR adds a config parameter to control how user-jars are being handled 
in regards ot the system class path for per-job yarn clusters.

The parameter allows:
* to disable the inclusion in the system classpath and use the user 
classpath instead ("DISABLE")
* prepend the user jars to the system class path ("FIRST")
* append the user jars to the system class path ("LAST")
* (default) add the user jars to the system class path based on the 
lexicographic order ("ORDER")

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 6031_yarn_userjars

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3931.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3931






> Add parameter for per job yarn clusters to control whether the user code jar 
> is included into the system classloader.
> -
>
> Key: FLINK-6031
> URL: https://issues.apache.org/jira/browse/FLINK-6031
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
>
> FLINK-4913 added the user jar into the system classloader, when starting a 
> Flink per job YARN cluster.
> Some users were experiencing issues with the changed behavior.
> I suggest to introduce a new yarn specific configuration parameter (for the 
> flink-conf.yaml file) to control if the user jar is added into system 
> classloader.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3931: [FLINK-6031][yarn] Add config parameter for user-j...

2017-05-17 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3931

[FLINK-6031][yarn] Add config parameter for user-jar inclusion in cla…

This PR adds a config parameter to control how user-jars are being handled 
in regards ot the system class path for per-job yarn clusters.

The parameter allows:
* to disable the inclusion in the system classpath and use the user 
classpath instead ("DISABLE")
* prepend the user jars to the system class path ("FIRST")
* append the user jars to the system class path ("LAST")
* (default) add the user jars to the system class path based on the 
lexicographic order ("ORDER")

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 6031_yarn_userjars

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3931.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3931






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6613) OOM during reading big messages from Kafka

2017-05-17 Thread Andrey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014307#comment-16014307
 ] 

Andrey commented on FLINK-6613:
---

-Xmx2g and ParallelGC. Default for openjdk.

> OOM during reading big messages from Kafka
> --
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using 
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages. 
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014288#comment-16014288
 ] 

ASF GitHub Bot commented on FLINK-4565:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3502
  
Hi @DmytroShkvyra, I'm sorry for the long time without response on this PR.
We're currently in "testing & bugfix mode" for the 1.3 release and I'll be 
gone for two weeks in a few days. I'm sorry, but I won't have much time to look 
at this PR until I return.


> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dmytro Shkvyra
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3502: [FLINK-4565] Support for SQL IN operator

2017-05-17 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3502
  
Hi @DmytroShkvyra, I'm sorry for the long time without response on this PR.
We're currently in "testing & bugfix mode" for the 1.3 release and I'll be 
gone for two weeks in a few days. I'm sorry, but I won't have much time to look 
at this PR until I return.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6611) When TaskManager or JobManager restart after crash the PID file contain also the old PID

2017-05-17 Thread Mauro Cortellazzi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014268#comment-16014268
 ] 

Mauro Cortellazzi commented on FLINK-6611:
--

Thank you [~greghogan] for the answer, i close the issue 

> When TaskManager or JobManager restart after crash the PID file contain also 
> the old PID
> 
>
> Key: FLINK-6611
> URL: https://issues.apache.org/jira/browse/FLINK-6611
> Project: Flink
>  Issue Type: Task
>  Components: Startup Shell Scripts
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
>
> When TaskManager or JobManager restart after crash the PID file contain also 
> the old PID.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6611) When TaskManager or JobManager restart after crash the PID file contain also the old PID

2017-05-17 Thread Mauro Cortellazzi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mauro Cortellazzi closed FLINK-6611.

Resolution: Not A Problem

> When TaskManager or JobManager restart after crash the PID file contain also 
> the old PID
> 
>
> Key: FLINK-6611
> URL: https://issues.apache.org/jira/browse/FLINK-6611
> Project: Flink
>  Issue Type: Task
>  Components: Startup Shell Scripts
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
>
> When TaskManager or JobManager restart after crash the PID file contain also 
> the old PID.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014246#comment-16014246
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116992617
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+
+class DataStreamJoinRule
+  extends ConverterRule(
+  classOf[FlinkLogicalJoin],
+  FlinkConventions.LOGICAL,
+  FlinkConventions.DATASTREAM,
+  "DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+
+val joinInfo = join.analyzeCondition
+
+// joins require at least one equi-condition
+!joinInfo.pairs().isEmpty
--- End diff --

We can do the join also without equi-join condition. In this case it the 
join would run with parallelism 1. 


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014248#comment-16014248
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116794639
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117033348
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
+  leftStreamWindowSize = -leftStreamWindowSize
+  if (!greatCond.isEqual) {
+leftStreamWindowSize -= 1
+  }
+} else {
+  leftStreamWindowSize = 0
+}
+
+// only 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014251#comment-16014251
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116790511
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
--- End diff --

-> `leftLogicalFieldCnt`

Add `rightLogicalFieldCnt`


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014252#comment-16014252
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116795204
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014256#comment-16014256
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116854249
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117033149
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
+  leftStreamWindowSize = -leftStreamWindowSize
+  if (!greatCond.isEqual) {
+leftStreamWindowSize -= 1
+  }
+} else {
+  leftStreamWindowSize = 0
+}
+
+// only 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014247#comment-16014247
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116791975
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
--- End diff --

`greaterConditions > 1 || greaterConditions == 0` can be replaced by 
`greaterConditions != 1`


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014253#comment-16014253
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116791061
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
--- End diff --

+r -> `greaterConditions`


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117006282
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.runtime.FilterRunner
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze join condition to get equi-conditon and other condition
+* @param  joinNode   logicaljoin node
+* @param  expression the function to generate condition string
+*/
+  private[flink] def analyzeJoinCondition(
+joinNode: FlinkLogicalJoin,
+expression: (RexNode, List[String], Option[List[RexNode]]) => String) 
= {
+
+val joinInfo = joinNode.analyzeCondition()
+val keyPairs = joinInfo.pairs.toList
+val otherCondition =
+  if(joinInfo.isEqui) null
+  else joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
+
+val leftKeys = ArrayBuffer.empty[Int]
+val rightKeys = ArrayBuffer.empty[Int]
+if (!keyPairs.isEmpty) {
+  val leftFields = joinNode.getLeft.getRowType.getFieldList
+  val rightFields = joinNode.getRight.getRowType.getFieldList
+
+  keyPairs.foreach(pair => {
+val leftKeyType = 
leftFields.get(pair.source).getType.getSqlTypeName
+val rightKeyType = 
rightFields.get(pair.target).getType.getSqlTypeName
+
+// check if keys are compatible
+if (leftKeyType == rightKeyType) {
+  // add key pair
+  leftKeys.append(pair.source)
+  rightKeys.append(pair.target)
+} else {
+  throw TableException(
+"Equality join predicate on incompatible types.\n" +
+  s"\tLeft: ${joinNode.getLeft.toString},\n" +
+  s"\tRight: ${joinNode.getRight.toString},\n" +
+  s"\tCondition: (${expression(joinNode.getCondition,
+joinNode.getRowType.getFieldNames.toList, None)})"
+  )
+}
+  })
+}
+(leftKeys.toArray, rightKeys.toArray, otherCondition)
+  }
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
--- End diff --

I think the logic of this function is correct. However, I find it a bit 
hard to follow because it starts with many conditions.
What do you think about the following approach:

1. convert 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117037366
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
--- End diff --

This method needs some good unit tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117027410
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
+  leftStreamWindowSize = -leftStreamWindowSize
+  if (!greatCond.isEqual) {
+leftStreamWindowSize -= 1
+  }
+} else {
+  leftStreamWindowSize = 0
+}
+
+// only 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014262#comment-16014262
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117033348
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014236#comment-16014236
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116797215
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014264#comment-16014264
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117004436
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
--- End diff --

I think this would make the code in this class a lot simpler, because we 
would not need to recursively dig into the condition. We can iterate over all 
conjunctive conditions and check for each if it is a valid time bound condition 
and either remove it or not. 


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117002279
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
--- End diff --

As a first step, the `condition` should be converted into CNF (conjunctive 
normal form) for normalization. Calcite offers the `RexUtil.toCnf()` method for 
that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117031586
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
+  leftStreamWindowSize = -leftStreamWindowSize
+  if (!greatCond.isEqual) {
+leftStreamWindowSize -= 1
+  }
+} else {
+  leftStreamWindowSize = 0
+}
+
+// only 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014258#comment-16014258
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117033149
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014254#comment-16014254
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117001241
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014260#comment-16014260
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117031586
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014255#comment-16014255
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116792128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
--- End diff --

The whole condition can be changed to `lessConditions != 1 || 
greaterConditions != 1`


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014259#comment-16014259
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117006282
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.runtime.FilterRunner
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze join condition to get equi-conditon and other condition
+* @param  joinNode   logicaljoin node
+* @param  expression the function to generate condition string
+*/
+  private[flink] def analyzeJoinCondition(
+joinNode: FlinkLogicalJoin,
+expression: (RexNode, List[String], Option[List[RexNode]]) => String) 
= {
+
+val joinInfo = joinNode.analyzeCondition()
+val keyPairs = joinInfo.pairs.toList
+val otherCondition =
+  if(joinInfo.isEqui) null
+  else joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
+
+val leftKeys = ArrayBuffer.empty[Int]
+val rightKeys = ArrayBuffer.empty[Int]
+if (!keyPairs.isEmpty) {
+  val leftFields = joinNode.getLeft.getRowType.getFieldList
+  val rightFields = joinNode.getRight.getRowType.getFieldList
+
+  keyPairs.foreach(pair => {
+val leftKeyType = 
leftFields.get(pair.source).getType.getSqlTypeName
+val rightKeyType = 
rightFields.get(pair.target).getType.getSqlTypeName
+
+// check if keys are compatible
+if (leftKeyType == rightKeyType) {
+  // add key pair
+  leftKeys.append(pair.source)
+  rightKeys.append(pair.target)
+} else {
+  throw TableException(
+"Equality join predicate on incompatible types.\n" +
+  s"\tLeft: ${joinNode.getLeft.toString},\n" +
+  s"\tRight: ${joinNode.getRight.toString},\n" +
+  s"\tCondition: (${expression(joinNode.getCondition,
+joinNode.getRowType.getFieldNames.toList, None)})"
+  )
+}
+  })
+}
+(leftKeys.toArray, rightKeys.toArray, otherCondition)
+  }
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014261#comment-16014261
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117037366
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
--- End diff --

This method needs some good unit tests.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014239#comment-16014239
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116861487
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
--- End diff --

more inline comments would be good


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116791596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
--- End diff --

Please wrap the arguments of a function as follows (if it does not fit in a 
single line):
```
analyzeTimeCondition(
  condition, 
  greateConditions,
  lessConditions, 
  leftLogicalFieldCnt, 
  inputType)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014263#comment-16014263
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117027410
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014249#comment-16014249
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116794136
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014241#comment-16014241
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116796524
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117004436
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
--- End diff --

I think this would make the code in this class a lot simpler, because we 
would not need to recursively dig into the condition. We can iterate over all 
conjunctive conditions and check for each if it is a valid time bound condition 
and either remove it or not. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116790511
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
--- End diff --

-> `leftLogicalFieldCnt`

Add `rightLogicalFieldCnt`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014235#comment-16014235
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116993635
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+
+class DataStreamJoinRule
+  extends ConverterRule(
+  classOf[FlinkLogicalJoin],
+  FlinkConventions.LOGICAL,
+  FlinkConventions.DATASTREAM,
+  "DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+
+val joinInfo = join.analyzeCondition
--- End diff --

I think it makes sense to separate different join cases on the level of 
plan operators. That would mean we would check here if the join has bounded 
time predicates (for example `left.rowtime BETWEEN right.rowtime - INTERVAL '1' 
MINUTE AND right.rowtime + INTERVAL '1' MINUTE`).

If this is the case, we would extract the relevant time predicates and 
create a stream-stream join RelNode. This would move a bit of code out of the 
`DataStreamJoin` into the rule.

IMO, the benefit is that it will be easier to add other joins because we 
only need to add a new rule, a new RelNode and runtime code. Hence, we would 
not need to touch the existing join strategy.

What do you think about this @hongyuhong?


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014257#comment-16014257
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r117002279
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
--- End diff --

As a first step, the `condition` should be converted into CNF (conjunctive 
normal form) for normalization. Calcite offers the `RexUtil.toCnf()` method for 
that.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014250#comment-16014250
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116792670
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
   

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r116796524
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.{FilterFunction, 
FlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  leftPhysicalFieldCnt: Int,
+  inputType: RelDataType,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (RelDataType, Long, Long, RexNode) = {
+// analyze the time-conditon to get greate and less condition,
+// make sure left stream field in the left of the condition
+// e.g b.proctime > a.proctime - 1 will be translate to a.proctime - 1 
< b.proctime
+val greateConditions = new util.ArrayList[TimeSingleCondition]()
+val lessConditions = new util.ArrayList[TimeSingleCondition]()
+analyzeTimeCondition(condition, greateConditions,
+  lessConditions, leftLogicalFieldCnt, inputType)
+if (greateConditions.size != lessConditions.size
+|| greateConditions.size > 1
+|| greateConditions.size == 0) {
+  throw TableException(
+"Equality join time conditon should have proctime or rowtime 
indicator."
+  )
+}
+
+val greatCond = greateConditions.get(0)
+val lessCond = lessConditions.get(0)
+if (greatCond.timeType != lessCond.timeType) {
+  throw TableException(
+"Equality join time conditon should all use proctime or all use 
rowtime."
+  )
+}
+
+var leftStreamWindowSize: Long = 0
+var rightStreamWindowSize: Long = 0
+
+// only a.proctime > b.proctime - interval '1' hour need to store a 
stream
+val timeLiteral: RexLiteral =
+reduceTimeExpression(greatCond.rightExpr, greatCond.leftExpr, 
rexBuilder, config)
+leftStreamWindowSize = timeLiteral.getValue2.asInstanceOf[Long]
+// only need to store past records
+if (leftStreamWindowSize < 0) {
+  leftStreamWindowSize = -leftStreamWindowSize
+  if (!greatCond.isEqual) {
+leftStreamWindowSize -= 1
+  }
+} else {
+  leftStreamWindowSize = 0
+}
+
+// only 

  1   2   3   >