[jira] [Commented] (FLINK-6491) Add QueryConfig to specify state retention time for streaming queries

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005933#comment-16005933
 ] 

ASF GitHub Bot commented on FLINK-6491:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3863
  
Hi @fhueske thanks a lot for your reviewing. I had update the PR. with 
following changes:
* Uniform hump name, change all `qConfig` and `qConf` to `queryConfig`.
* Add warn log into `DataStreamOverAggregate`.
* Fix cleanup logic
* Extend test cases that cover the corner cases of the clean up logic.

But there are two things I have not do, that is:
*  `check for processing time over range windows that 
minIdleStateRetentionTime > preceding interval.`
   I do not think we need add this check. Because both row-base and 
time-base will be able to meet the incorrect result due to state cleanup. an 
importation thing we need to do is let use know the function of the cleanup 
config.
* `remove the registerEventCleanupTimer method`
The reason of I add this method is In A row-time OVER 
(TimeDomain.EVENT_TIME), we always get 0 when we call 
ctx.timerService.currentProcessingTime.

What do you think?
Thanks,
SunJincheng


> Add QueryConfig to specify state retention time for streaming queries
> -
>
> Key: FLINK-6491
> URL: https://issues.apache.org/jira/browse/FLINK-6491
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>Priority: Critical
>
> By now we have a couple of streaming operators (group-windows, over-windows, 
> non-windowed aggregations) that require operator state. Since state is not 
> automatically cleaned-up by Flink, we need to add a mechanism to configure a 
> state retention time. 
> If configured, a query will retain state for a specified period of state 
> inactivity. If state is not accessed within this period of time, it will be 
> cleared. I propose to add two parameters for this, a min and a max retention 
> time. The min retention time specifies the earliest time and the max 
> retention time the latest time when state is cleared. The reasoning for 
> having two parameters is that we can avoid to register many timers if we have 
> more freedom when to discard state.
> This issue also introduces a QueryConfig object which can be passed to a 
> streaming query, when it is emitted to a TableSink or converted to a 
> DataStream (append or retraction).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3863: [FLINK-6491][talbe]Add QueryConfig to specify state reten...

2017-05-10 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3863
  
Hi @fhueske thanks a lot for your reviewing. I had update the PR. with 
following changes:
* Uniform hump name, change all `qConfig` and `qConf` to `queryConfig`.
* Add warn log into `DataStreamOverAggregate`.
* Fix cleanup logic
* Extend test cases that cover the corner cases of the clean up logic.

But there are two things I have not do, that is:
*  `check for processing time over range windows that 
minIdleStateRetentionTime > preceding interval.`
   I do not think we need add this check. Because both row-base and 
time-base will be able to meet the incorrect result due to state cleanup. an 
importation thing we need to do is let use know the function of the cleanup 
config.
* `remove the registerEventCleanupTimer method`
The reason of I add this method is In A row-time OVER 
(TimeDomain.EVENT_TIME), we always get 0 when we call 
ctx.timerService.currentProcessingTime.

What do you think?
Thanks,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6491) Add QueryConfig to specify state retention time for streaming queries

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005922#comment-16005922
 ] 

ASF GitHub Bot commented on FLINK-6491:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3863#discussion_r115908788
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -239,6 +248,7 @@ class DataStreamOverAggregate(
   }
 
   def createBoundedAndCurrentRowOverWindow(
--- End diff --

I do not think we need add this check. Because both `row-base` and 
`time-base` will be able to meet the incorrect result due to state cleanup. an 
importation thing we need to do is let use know the function of the `cleanup 
config`. 


> Add QueryConfig to specify state retention time for streaming queries
> -
>
> Key: FLINK-6491
> URL: https://issues.apache.org/jira/browse/FLINK-6491
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>Priority: Critical
>
> By now we have a couple of streaming operators (group-windows, over-windows, 
> non-windowed aggregations) that require operator state. Since state is not 
> automatically cleaned-up by Flink, we need to add a mechanism to configure a 
> state retention time. 
> If configured, a query will retain state for a specified period of state 
> inactivity. If state is not accessed within this period of time, it will be 
> cleared. I propose to add two parameters for this, a min and a max retention 
> time. The min retention time specifies the earliest time and the max 
> retention time the latest time when state is cleared. The reasoning for 
> having two parameters is that we can avoid to register many timers if we have 
> more freedom when to discard state.
> This issue also introduces a QueryConfig object which can be passed to a 
> streaming query, when it is emitted to a TableSink or converted to a 
> DataStream (append or retraction).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3863: [FLINK-6491][talbe]Add QueryConfig to specify stat...

2017-05-10 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3863#discussion_r115908788
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -239,6 +248,7 @@ class DataStreamOverAggregate(
   }
 
   def createBoundedAndCurrentRowOverWindow(
--- End diff --

I do not think we need add this check. Because both `row-base` and 
`time-base` will be able to meet the incorrect result due to state cleanup. an 
importation thing we need to do is let use know the function of the `cleanup 
config`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6491) Add QueryConfig to specify state retention time for streaming queries

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005913#comment-16005913
 ] 

ASF GitHub Bot commented on FLINK-6491:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3863#discussion_r115907975
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.state.State
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+
+abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig: 
StreamQueryConfig)
+  extends ProcessFunction[IN, OUT]{
+
+  protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled = minRetentionTime > 1 && 
maxRetentionTime > 1
+  // interval in which clean-up timers are registered
+  protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime
+
+  // holds the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected def initCleanupTimeState(stateName: String) {
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](stateName, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+  }
+
+  protected def registerProcessingCleanupTimer(
+ctx: ProcessFunction[IN, OUT]#Context,
+currentTime: Long): Unit = {
+if (stateCleaningEnabled) {
+
+  val earliestCleanup = currentTime + minRetentionTime
+
+  // last registered timer
+  val lastCleanupTime = cleanupTimeState.value()
+
+  if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + 
cleanupTimerInterval) {
+// we need to register a new timer
+val cleanupTime = earliestCleanup + cleanupTimerInterval
+// register timer and remember clean-up time
+ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+cleanupTimeState.update(cleanupTime)
+  }
+}
+  }
+  protected def registerEventCleanupTimer(
--- End diff --

The reason of I add this method is In A `row-time` OVER 
(TimeDomain.EVENT_TIME), we always get `0` when we call 
`ctx.timerService.currentProcessingTime`.


> Add QueryConfig to specify state retention time for streaming queries
> -
>
> Key: FLINK-6491
> URL: https://issues.apache.org/jira/browse/FLINK-6491
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>Priority: Critical
>
> By now we have a couple of streaming operators (group-windows, over-windows, 
> non-windowed aggregations) that require operator state. Since state is not 
> automatically cleaned-up by Flink, we need to add a mechanism to configure a 
> state retention time. 
> If configured, a query will retain state for a specified period of state 
> inactivity. If state is not accessed within this period of time, it will be 
> cleared. I propose to add two parameters for this, a min and a max retention 
> time. The min retention time specifies the earliest time and the max 
> retention time the latest time when state is cleared. The reasoning for 
> having two parameters is that we can avoid to register 

[GitHub] flink pull request #3863: [FLINK-6491][talbe]Add QueryConfig to specify stat...

2017-05-10 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3863#discussion_r115907975
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.state.State
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+
+abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig: 
StreamQueryConfig)
+  extends ProcessFunction[IN, OUT]{
+
+  protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled = minRetentionTime > 1 && 
maxRetentionTime > 1
+  // interval in which clean-up timers are registered
+  protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime
+
+  // holds the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected def initCleanupTimeState(stateName: String) {
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](stateName, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+  }
+
+  protected def registerProcessingCleanupTimer(
+ctx: ProcessFunction[IN, OUT]#Context,
+currentTime: Long): Unit = {
+if (stateCleaningEnabled) {
+
+  val earliestCleanup = currentTime + minRetentionTime
+
+  // last registered timer
+  val lastCleanupTime = cleanupTimeState.value()
+
+  if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + 
cleanupTimerInterval) {
+// we need to register a new timer
+val cleanupTime = earliestCleanup + cleanupTimerInterval
+// register timer and remember clean-up time
+ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+cleanupTimeState.update(cleanupTime)
+  }
+}
+  }
+  protected def registerEventCleanupTimer(
--- End diff --

The reason of I add this method is In A `row-time` OVER 
(TimeDomain.EVENT_TIME), we always get `0` when we call 
`ctx.timerService.currentProcessingTime`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6532) Mesos version check

2017-05-10 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-6532:
---

 Summary: Mesos version check
 Key: FLINK-6532
 URL: https://issues.apache.org/jira/browse/FLINK-6532
 Project: Flink
  Issue Type: Improvement
  Components: Mesos
Reporter: Eron Wright 


The minimum requirement for the Mesos subsystem of Flink is 1.0.   We should 
enforce the requirement with a version check upon connection.  This may be 
accomplished by checking the 'version' property of the 'MesosInfo' structure.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6502) Add support ElasticsearchSink for DataSet

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005806#comment-16005806
 ] 

ASF GitHub Bot commented on FLINK-6502:
---

GitHub user 397090770 opened a pull request:

https://github.com/apache/flink/pull/3869

[FLINK-6502] Add support ElasticsearchSink for DataSet

Currently, Flink only support writing data in `DataStream` to ElasticSearch 
through `ElasticsearchSink`,  it will be very useful if Flink internal support 
writing data in `DataSet` to ElasticSearch. See 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html)

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/397090770/flink FLINK-6502

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3869.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3869


commit a58ce4bcd86aae000c8308a66ca5c74a1812fb46
Author: yangping.wu 
Date:   2017-05-11T02:14:03Z

[FLINK-6502] Add support ElasticsearchSink for DataSet




> Add support ElasticsearchSink for DataSet
> -
>
> Key: FLINK-6502
> URL: https://issues.apache.org/jira/browse/FLINK-6502
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.2.1
>Reporter: wyp
>
> Currently, Flink only support writing data in DataStream to ElasticSearch 
> through {{ElasticsearchSink}}, We should be able to writing data in DataSet 
> to ElasticSearch too. see 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3869: [FLINK-6502] Add support ElasticsearchSink for Dat...

2017-05-10 Thread 397090770
GitHub user 397090770 opened a pull request:

https://github.com/apache/flink/pull/3869

[FLINK-6502] Add support ElasticsearchSink for DataSet

Currently, Flink only support writing data in `DataStream` to ElasticSearch 
through `ElasticsearchSink`,  it will be very useful if Flink internal support 
writing data in `DataSet` to ElasticSearch. See 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html)

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/397090770/flink FLINK-6502

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3869.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3869


commit a58ce4bcd86aae000c8308a66ca5c74a1812fb46
Author: yangping.wu 
Date:   2017-05-11T02:14:03Z

[FLINK-6502] Add support ElasticsearchSink for DataSet




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6502) Add Support ElasticsearchSink for DataSet

2017-05-10 Thread wyp (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wyp updated FLINK-6502:
---
Summary: Add Support ElasticsearchSink for DataSet  (was: Support writing 
data in DataSet to ElasticSearch)

> Add Support ElasticsearchSink for DataSet
> -
>
> Key: FLINK-6502
> URL: https://issues.apache.org/jira/browse/FLINK-6502
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.2.1
>Reporter: wyp
>
> Currently, Flink only support writing data in DataStream to ElasticSearch 
> through {{ElasticsearchSink}}, We should be able to writing data in DataSet 
> to ElasticSearch too. see 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6502) Add support ElasticsearchSink for DataSet

2017-05-10 Thread wyp (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wyp updated FLINK-6502:
---
Summary: Add support ElasticsearchSink for DataSet  (was: Add Support 
ElasticsearchSink for DataSet)

> Add support ElasticsearchSink for DataSet
> -
>
> Key: FLINK-6502
> URL: https://issues.apache.org/jira/browse/FLINK-6502
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.2.1
>Reporter: wyp
>
> Currently, Flink only support writing data in DataStream to ElasticSearch 
> through {{ElasticsearchSink}}, We should be able to writing data in DataSet 
> to ElasticSearch too. see 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6491) Add QueryConfig to specify state retention time for streaming queries

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005751#comment-16005751
 ] 

ASF GitHub Bot commented on FLINK-6491:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3863#discussion_r115888057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -81,6 +81,8 @@ abstract class StreamTableEnvironment(
   // the naming pattern for internally registered tables.
   private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
 
+  def qConf: StreamQueryConfig = new StreamQueryConfig
--- End diff --

Yes,I want change all `qConfig` and `qConf` to `queryConfig`.


> Add QueryConfig to specify state retention time for streaming queries
> -
>
> Key: FLINK-6491
> URL: https://issues.apache.org/jira/browse/FLINK-6491
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>Priority: Critical
>
> By now we have a couple of streaming operators (group-windows, over-windows, 
> non-windowed aggregations) that require operator state. Since state is not 
> automatically cleaned-up by Flink, we need to add a mechanism to configure a 
> state retention time. 
> If configured, a query will retain state for a specified period of state 
> inactivity. If state is not accessed within this period of time, it will be 
> cleared. I propose to add two parameters for this, a min and a max retention 
> time. The min retention time specifies the earliest time and the max 
> retention time the latest time when state is cleared. The reasoning for 
> having two parameters is that we can avoid to register many timers if we have 
> more freedom when to discard state.
> This issue also introduces a QueryConfig object which can be passed to a 
> streaming query, when it is emitted to a TableSink or converted to a 
> DataStream (append or retraction).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3863: [FLINK-6491][talbe]Add QueryConfig to specify stat...

2017-05-10 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3863#discussion_r115888057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -81,6 +81,8 @@ abstract class StreamTableEnvironment(
   // the naming pattern for internally registered tables.
   private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
 
+  def qConf: StreamQueryConfig = new StreamQueryConfig
--- End diff --

Yes,I want change all `qConfig` and `qConf` to `queryConfig`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2017-05-10 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4534:
--
Description: 
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():
{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():

{code}
for (Map.Entry entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}

  was:
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():

{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():

{code}
for (Map.Entry entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}


> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5376) Misleading log statements in UnorderedStreamElementQueue

2017-05-10 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-5376:
--
Description: 
The following are two examples where ordered stream element queue is mentioned:

{code}
LOG.debug("Put element into ordered stream element queue. New filling 
degree " +
  "({}/{}).", numberEntries, capacity);

return true;
  } else {
LOG.debug("Failed to put element into ordered stream element queue 
because it " +
{code}
I guess OrderedStreamElementQueue was coded first.

  was:
The following are two examples where ordered stream element queue is mentioned:
{code}
LOG.debug("Put element into ordered stream element queue. New filling 
degree " +
  "({}/{}).", numberEntries, capacity);

return true;
  } else {
LOG.debug("Failed to put element into ordered stream element queue 
because it " +
{code}
I guess OrderedStreamElementQueue was coded first.


> Misleading log statements in UnorderedStreamElementQueue
> 
>
> Key: FLINK-5376
> URL: https://issues.apache.org/jira/browse/FLINK-5376
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The following are two examples where ordered stream element queue is 
> mentioned:
> {code}
> LOG.debug("Put element into ordered stream element queue. New filling 
> degree " +
>   "({}/{}).", numberEntries, capacity);
> return true;
>   } else {
> LOG.debug("Failed to put element into ordered stream element queue 
> because it " +
> {code}
> I guess OrderedStreamElementQueue was coded first.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-05-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822346#comment-15822346
 ] 

Ted Yu edited comment on FLINK-5486 at 5/11/17 1:04 AM:


Lock on State.bucketStates should be held in the following method:
{code}
  private void handleRestoredBucketState(State restoredState) {
Preconditions.checkNotNull(restoredState);

for (BucketState bucketState : restoredState.bucketStates.values()) {
{code}


was (Author: yuzhih...@gmail.com):
Lock on State.bucketStates should be held in the following method:

{code}
  private void handleRestoredBucketState(State restoredState) {
Preconditions.checkNotNull(restoredState);

for (BucketState bucketState : restoredState.bucketStates.values()) {
{code}

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2017-05-10 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-6105:
--
Description: 
When catching InterruptedException, we should throw InterruptedIOException 
instead of IOException.

The following example is from HadoopInputFormatBase :
{code}
try {
  splits = this.mapreduceInputFormat.getSplits(jobContext);
} catch (InterruptedException e) {
  throw new IOException("Could not get Splits.", e);
}
{code}

There may be other places where IOE is thrown.

  was:
When catching InterruptedException, we should throw InterruptedIOException 
instead of IOException.

The following example is from HadoopInputFormatBase :

{code}
try {
  splits = this.mapreduceInputFormat.getSplits(jobContext);
} catch (InterruptedException e) {
  throw new IOException("Could not get Splits.", e);
}
{code}

There may be other places where IOE is thrown.


> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()

2017-05-10 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-5541:
--
Description: 
{code}
  if (localJar == null) {
try {
  for (final URL url : ((ContextEnvironment) 
ExecutionEnvironment.getExecutionEnvironment())
  .getJars()) {
// TODO verify that there is only one jar
localJar = new File(url.toURI()).getAbsolutePath();
  }
} catch (final URISyntaxException e) {
  // ignore
} catch (final ClassCastException e) {
  // ignore
}
  }

  logger.info("Submitting topology " + name + " in distributed mode with 
conf " + serConf);
  client.submitTopologyWithOpts(name, localJar, topology);
{code}
Since the try block may encounter URISyntaxException / ClassCastException, we 
should check that localJar is not null before calling submitTopologyWithOpts().

  was:
{code}
  if (localJar == null) {
try {
  for (final URL url : ((ContextEnvironment) 
ExecutionEnvironment.getExecutionEnvironment())
  .getJars()) {
// TODO verify that there is only one jar
localJar = new File(url.toURI()).getAbsolutePath();
  }
} catch (final URISyntaxException e) {
  // ignore
} catch (final ClassCastException e) {
  // ignore
}
  }

  logger.info("Submitting topology " + name + " in distributed mode with 
conf " + serConf);
  client.submitTopologyWithOpts(name, localJar, topology);
{code}

Since the try block may encounter URISyntaxException / ClassCastException, we 
should check that localJar is not null before calling submitTopologyWithOpts().


> Missing null check for localJar in FlinkSubmitter#submitTopology()
> --
>
> Key: FLINK-5541
> URL: https://issues.apache.org/jira/browse/FLINK-5541
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   if (localJar == null) {
> try {
>   for (final URL url : ((ContextEnvironment) 
> ExecutionEnvironment.getExecutionEnvironment())
>   .getJars()) {
> // TODO verify that there is only one jar
> localJar = new File(url.toURI()).getAbsolutePath();
>   }
> } catch (final URISyntaxException e) {
>   // ignore
> } catch (final ClassCastException e) {
>   // ignore
> }
>   }
>   logger.info("Submitting topology " + name + " in distributed mode with 
> conf " + serConf);
>   client.submitTopologyWithOpts(name, localJar, topology);
> {code}
> Since the try block may encounter URISyntaxException / ClassCastException, we 
> should check that localJar is not null before calling 
> submitTopologyWithOpts().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5488) yarnClient should be closed in AbstractYarnClusterDescriptor for error conditions

2017-05-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005742#comment-16005742
 ] 

Ted Yu commented on FLINK-5488:
---

I agree.

> yarnClient should be closed in AbstractYarnClusterDescriptor for error 
> conditions
> -
>
> Key: FLINK-5488
> URL: https://issues.apache.org/jira/browse/FLINK-5488
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Ted Yu
>Assignee: Fang Yong
>
> Here is one example:
> {code}
> if(jobManagerMemoryMb > maxRes.getMemory() ) {
>   failSessionDuringDeployment(yarnClient, yarnApplication);
>   throw new YarnDeploymentException("The cluster does not have the 
> requested resources for the JobManager available!\n"
> + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + 
> jobManagerMemoryMb + "MB. " + NOTE);
> }
> {code}
> yarnClient implements Closeable.
> It should be closed in situations where exception is thrown.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6332) Upgrade Scala version to 2.11.11

2017-05-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005740#comment-16005740
 ] 

Ted Yu commented on FLINK-6332:
---

We can wait for newer Scala release for more visible benefits.

> Upgrade Scala version to 2.11.11
> 
>
> Key: FLINK-6332
> URL: https://issues.apache.org/jira/browse/FLINK-6332
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Scala API
>Reporter: Ted Yu
>Priority: Minor
>
> Currently scala-2.11 profile uses Scala 2.11.7
> 2.11.11 is the most recent version.
> This issue is to upgrade to Scala 2.11.11



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6414) Use scala.binary.version in place of change-scala-version.sh

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005717#comment-16005717
 ] 

ASF GitHub Bot commented on FLINK-6414:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3800
  
@StephanEwen does the recent discussion on packaging shaded dependencies 
mean that `force-shading` will go away? Or do we keep that even if dependencies 
are no longer being shaded? Or will only some shaded dependencies (the most 
problematic) be provided?


> Use scala.binary.version in place of change-scala-version.sh
> 
>
> Key: FLINK-6414
> URL: https://issues.apache.org/jira/browse/FLINK-6414
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Recent commits have failed to modify {{change-scala-version.sh}} resulting in 
> broken builds for {{scala-2.11}}. It looks like we can remove the need for 
> this script by replacing hard-coded references to the Scala version with 
> Flink's maven variable {{scala.binary.version}}.
> I had initially realized that the change script is [only used for 
> building|https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/building.html#scala-versions]
>  and not for switching the IDE environment.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3800: [FLINK-6414] [build] Use scala.binary.version in place of...

2017-05-10 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3800
  
@StephanEwen does the recent discussion on packaging shaded dependencies 
mean that `force-shading` will go away? Or do we keep that even if dependencies 
are no longer being shaded? Or will only some shaded dependencies (the most 
problematic) be provided?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6529) Rework the shading model in Flink

2017-05-10 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005710#comment-16005710
 ] 

Greg Hogan commented on FLINK-6529:
---

[~wheat9] appreciate your enthusiasm and just wanted to make sure you have 
followed the discussion on the mailing list. I believe these modules will go in 
a separate Flink repo so you'll want to coordinate with [~StephanEwen] 
especially if the goal is to have this in for 1.3.

> Rework the shading model in Flink
> -
>
> Key: FLINK-6529
> URL: https://issues.apache.org/jira/browse/FLINK-6529
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Stephan Ewen
>Assignee: Haohui Mai
>Priority: Critical
>
> h2. Problem
> Currently, Flink shades dependencies like ASM and Guava into all jars of 
> projects that reference it and relocate the classes.
> There are some drawbacks to that approach, let's discuss them at the example 
> of ASM:
>   - The ASM classes are for example in {{flink-core}}, {{flink-java}}, 
> {{flink-scala}}, {{flink-runtime}}, etc.
>   - Users that reference these dependencies have the classes multiple times 
> in the classpath. That is unclean (works, through, because the classes are 
> identical). The same happens when building the final dist. jar.
>   - Some of these dependencies require to include license files in the shaded 
> jar. It is hard to impossible to build a good automatic solution for that, 
> partly due to Maven's very poor cross-project path support
>   - Scala does not support shading really well. Scala classes have references 
> to classes in more places than just the class names (apparently for Scala 
> reflect support). Referencing a Scala project with shaded ASM still requires 
> to add a reference to unshaded ASM (at least as a compile dependency).
> h2. Proposal
> I propose that we build and deploy a {{asm-flink-shaded}} version of ASM and 
> directly program against the relocated namespaces. Since we never use classes 
> that we relocate in public interfaces, Flink users will never see the 
> relocated class names. Internally, it does not hurt to use them.
>   - Proper maven dependency management, no hidden (shaded) dependencies
>   - one copy of each dependency
>   - proper Scala interoperability
>   - no clumsy license management (license is in the deployed 
> {{asm-flink-shaded}})



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3866: A refactor to avoid cloned code in try-catch blocks (usin...

2017-05-10 Thread rbonifacio
Github user rbonifacio commented on the issue:

https://github.com/apache/flink/pull/3866
  
@greghogan I changed the files, introducing the missing spaces after 
catches. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005634#comment-16005634
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115871238
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005644#comment-16005644
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115874289
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -698,6 +698,12 @@ class CodeGenerator(
   s"void join(Object _in1, Object _in2, 
org.apache.flink.util.Collector $collectorTerm)",
   List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
+  } else if (clazz == classOf[FilterFunction[_]]) {
--- End diff --

Not needed if we use `JoinFunction` instead of `FilterFunction`


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005645#comment-16005645
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115866096
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005649#comment-16005649
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115873645
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005641#comment-16005641
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115855446
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
+|| right.isInstanceOf[StreamTableSourceScan]) {
+  throw new TableException(
+"Join between stream and table is not supported yet.")
+}
+// analyze time boundary and time predicate type(proctime/rowtime)
+val (timeType, leftStreamWindowSize, rightStreamWindowSize, 
conditionWithoutTime) =
+  JoinUtil.analyzeTimeBoundary(
+otherCondition,
+leftSchema.logicalType.getFieldCount,
+leftSchema.physicalType.getFieldCount,
+schema.logicalType,
+joinNode.getCluster.getRexBuilder,
+config)
+
+val leftDataStream = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005636#comment-16005636
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115864221
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
+|| right.isInstanceOf[StreamTableSourceScan]) {
+  throw new TableException(
+"Join between stream and table is not supported yet.")
+}
+// analyze time boundary and time predicate type(proctime/rowtime)
+val (timeType, leftStreamWindowSize, rightStreamWindowSize, 
conditionWithoutTime) =
+  JoinUtil.analyzeTimeBoundary(
+otherCondition,
+leftSchema.logicalType.getFieldCount,
+leftSchema.physicalType.getFieldCount,
+schema.logicalType,
+joinNode.getCluster.getRexBuilder,
+config)
+
+val leftDataStream = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005632#comment-16005632
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115847719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
--- End diff --

please do not include a `FlinkLogicalJoin` node but the condition and the 
join type.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005623#comment-16005623
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115845838
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+
+class DataStreamJoinRule
+  extends ConverterRule(
+  classOf[FlinkLogicalJoin],
+  FlinkConventions.LOGICAL,
+  FlinkConventions.DATASTREAM,
+  "DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+
+val joinInfo = join.analyzeCondition
+
+// joins require an equi-condition or a conjunctive predicate with at 
least one equi-condition
+// and disable outer joins with non-equality predicates
+!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == 
JoinRelType.INNER)
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+
+val join: FlinkLogicalJoin = rel.asInstanceOf[FlinkLogicalJoin]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
FlinkConventions.DATASTREAM)
+val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
FlinkConventions.DATASTREAM)
+
+new DataStreamJoin(
+  rel.getCluster,
+  traitSet,
+  convLeft,
+  convRight,
+  join,
--- End diff --

please pass join condition and join type instead of the `LogicalFlinkJoin`. 
This is a plan node of the logical plan that should not be part of a node in 
the physical plan.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005630#comment-16005630
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115852788
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
+|| right.isInstanceOf[StreamTableSourceScan]) {
+  throw new TableException(
+"Join between stream and table is not supported yet.")
+}
+// analyze time boundary and time predicate type(proctime/rowtime)
+val (timeType, leftStreamWindowSize, rightStreamWindowSize, 
conditionWithoutTime) =
+  JoinUtil.analyzeTimeBoundary(
+otherCondition,
+leftSchema.logicalType.getFieldCount,
+leftSchema.physicalType.getFieldCount,
+schema.logicalType,
+joinNode.getCluster.getRexBuilder,
+config)
+
+val leftDataStream = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005631#comment-16005631
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115865585
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005633#comment-16005633
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115872865
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
--- End diff --

what are the timer states used for? I only see one `value()` call to get 
the value which checks `== 0` if a timer is registered. Can we make this a 
boolean state then? 
What would happen if we would not have this state? Would there be more 
timers (timers are unique by time).


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005646#comment-16005646
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115868597
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
--- End diff --

indent


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005637#comment-16005637
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115864982
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
--- End diff --

I think we can use `TypeInformation[Row]` here and don't need the `CRow` 
wrapper.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005642#comment-16005642
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115868097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.runtime.FilterRunner
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze join condition to get equi-conditon and other condition
+* @param  joinNode   logicaljoin node
+* @param  expression the function to generate condition string
+*/
+  private[flink] def analyzeJoinCondition(
+joinNode: FlinkLogicalJoin,
+expression: (RexNode, List[String], Option[List[RexNode]]) => String) 
= {
+
+val joinInfo = joinNode.analyzeCondition()
+val keyPairs = joinInfo.pairs.toList
+val otherCondition =
+  if(joinInfo.isEqui) null
+  else joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
+
+val leftKeys = ArrayBuffer.empty[Int]
+val rightKeys = ArrayBuffer.empty[Int]
+if (!keyPairs.isEmpty) {
+  val leftFields = joinNode.getLeft.getRowType.getFieldList
+  val rightFields = joinNode.getRight.getRowType.getFieldList
+
+  keyPairs.foreach(pair => {
+val leftKeyType = 
leftFields.get(pair.source).getType.getSqlTypeName
+val rightKeyType = 
rightFields.get(pair.target).getType.getSqlTypeName
+
+// check if keys are compatible
+if (leftKeyType == rightKeyType) {
+  // add key pair
+  leftKeys.append(pair.source)
+  rightKeys.append(pair.target)
+} else {
+  throw TableException(
+"Equality join predicate on incompatible types.\n" +
+  s"\tLeft: ${joinNode.getLeft.toString},\n" +
+  s"\tRight: ${joinNode.getRight.toString},\n" +
+  s"\tCondition: (${expression(joinNode.getCondition,
+joinNode.getRowType.getFieldNames.toList, None)})"
+  )
+}
+  })
+}
+(leftKeys.toArray, rightKeys.toArray, otherCondition)
+  }
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005622#comment-16005622
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115850396
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
--- End diff --

This can be done by 
```
val joinInfo = JoinInfo.of(leftNode, rightNode, condition)
val leftKeys: Array[Int] = joinInfo.leftKeys.toIntArray
val rightKeys: Array[Int] = joinInfo.rightKeys.toIntArray
val otherCondition = joinInfo.getRemaining(cluster.getRexBuilder)
``` 
So we do not need a special method for this. The type checks are not 
required, because Calcite will make sure during validation that only compatible 
types are compared. So we can be sure that types are valid.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
>   

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005635#comment-16005635
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115875501
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005638#comment-16005638
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115852606
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
--- End diff --

I don't think we need this restriction. A `StreamTableSourceScan` produces 
a stream not a table.
So by this we forbid regular stream-stream joins.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005643#comment-16005643
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115866207
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005639#comment-16005639
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115864005
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FilterRunner.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.functions.{FilterFunction, 
RichFilterFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.slf4j.LoggerFactory
+
+class FilterRunner[IN] (
--- End diff --

I don't think we need this additional wrapper. We could just pass the 
generated code of the `FilterFunction` to the `CoProcessFunction` and compile 
it in its open() method.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005624#comment-16005624
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115796448
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+
+class DataStreamJoinRule
+  extends ConverterRule(
+  classOf[FlinkLogicalJoin],
+  FlinkConventions.LOGICAL,
+  FlinkConventions.DATASTREAM,
+  "DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+
+val joinInfo = join.analyzeCondition
+
+// joins require an equi-condition or a conjunctive predicate with at 
least one equi-condition
+// and disable outer joins with non-equality predicates
+!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == 
JoinRelType.INNER)
--- End diff --

Is this condition correct? Are outer joins (incl. LEFT, RIGHT, FULL OUTER) 
supported if the join is an equality join (all conjunctive predicates are 
equality predicates)?


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005627#comment-16005627
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115874222
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
+|| right.isInstanceOf[StreamTableSourceScan]) {
+  throw new TableException(
+"Join between stream and table is not supported yet.")
+}
+// analyze time boundary and time predicate type(proctime/rowtime)
+val (timeType, leftStreamWindowSize, rightStreamWindowSize, 
conditionWithoutTime) =
+  JoinUtil.analyzeTimeBoundary(
+otherCondition,
+leftSchema.logicalType.getFieldCount,
+leftSchema.physicalType.getFieldCount,
+schema.logicalType,
+joinNode.getCluster.getRexBuilder,
+config)
+
+val leftDataStream = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005647#comment-16005647
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115873581
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005625#comment-16005625
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115851247
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.runtime.FilterRunner
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze join condition to get equi-conditon and other condition
+* @param  joinNode   logicaljoin node
+* @param  expression the function to generate condition string
+*/
+  private[flink] def analyzeJoinCondition(
--- End diff --

I don't think we need this method. We can analyze the condition with 
Calcite's `JoinInfo`. Calcite's validation checks before optimization and 
translation that the types of the conditions are OK.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005640#comment-16005640
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115868422
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
--- End diff --

use a `JoinFunction` instead of a `FilterFunction`. Compile the code here 
instead of using a wrapper.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005628#comment-16005628
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115847866
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
--- End diff --

why don't we need the `rightSchema`?


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005626#comment-16005626
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115873251
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = 

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005629#comment-16005629
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115844805
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+
+class DataStreamJoinRule
+  extends ConverterRule(
+  classOf[FlinkLogicalJoin],
+  FlinkConventions.LOGICAL,
+  FlinkConventions.DATASTREAM,
+  "DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+
+val joinInfo = join.analyzeCondition
+
+// joins require an equi-condition or a conjunctive predicate with at 
least one equi-condition
+// and disable outer joins with non-equality predicates
+!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == 
JoinRelType.INNER)
--- End diff --

I assume not, because the join type is not passed on to the 
`DataStreamJoin`.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115855446
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
+|| right.isInstanceOf[StreamTableSourceScan]) {
+  throw new TableException(
+"Join between stream and table is not supported yet.")
+}
+// analyze time boundary and time predicate type(proctime/rowtime)
+val (timeType, leftStreamWindowSize, rightStreamWindowSize, 
conditionWithoutTime) =
+  JoinUtil.analyzeTimeBoundary(
+otherCondition,
+leftSchema.logicalType.getFieldCount,
+leftSchema.physicalType.getFieldCount,
+schema.logicalType,
+joinNode.getCluster.getRexBuilder,
+config)
+
+val leftDataStream = 
left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+val rightDataStream = 
right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+// generate other condition filter function
+val filterFunction =
+  

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115873645
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
+
+val valueStateDescriptor2: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+timerState2 = 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115868097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.runtime.FilterRunner
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze join condition to get equi-conditon and other condition
+* @param  joinNode   logicaljoin node
+* @param  expression the function to generate condition string
+*/
+  private[flink] def analyzeJoinCondition(
+joinNode: FlinkLogicalJoin,
+expression: (RexNode, List[String], Option[List[RexNode]]) => String) 
= {
+
+val joinInfo = joinNode.analyzeCondition()
+val keyPairs = joinInfo.pairs.toList
+val otherCondition =
+  if(joinInfo.isEqui) null
+  else joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
+
+val leftKeys = ArrayBuffer.empty[Int]
+val rightKeys = ArrayBuffer.empty[Int]
+if (!keyPairs.isEmpty) {
+  val leftFields = joinNode.getLeft.getRowType.getFieldList
+  val rightFields = joinNode.getRight.getRowType.getFieldList
+
+  keyPairs.foreach(pair => {
+val leftKeyType = 
leftFields.get(pair.source).getType.getSqlTypeName
+val rightKeyType = 
rightFields.get(pair.target).getType.getSqlTypeName
+
+// check if keys are compatible
+if (leftKeyType == rightKeyType) {
+  // add key pair
+  leftKeys.append(pair.source)
+  rightKeys.append(pair.target)
+} else {
+  throw TableException(
+"Equality join predicate on incompatible types.\n" +
+  s"\tLeft: ${joinNode.getLeft.toString},\n" +
+  s"\tRight: ${joinNode.getRight.toString},\n" +
+  s"\tCondition: (${expression(joinNode.getCondition,
+joinNode.getRowType.getFieldNames.toList, None)})"
+  )
+}
+  })
+}
+(leftKeys.toArray, rightKeys.toArray, otherCondition)
+  }
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return condition without time-condition.
+*
+* @param  condition   other condtion include time-condition
+* @param  leftFieldCount left stream fields count
+* @param  inputType   left and right connect stream type
+* @param  rexBuilder   util to build rexNode
+* @param  config  table environment config
+*/
+  private[flink] def analyzeTimeBoundary(
--- End diff --

I did not go through the details of this method yet but apparently it 
analyzes and decomposes the time based condition. It would be very good to have 
a couple of unit tests which only check this 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115851247
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala
 ---
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+import java.util.EnumSet
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind}
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.runtime.FilterRunner
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+
+object JoinUtil {
+
+  /**
+* Analyze join condition to get equi-conditon and other condition
+* @param  joinNode   logicaljoin node
+* @param  expression the function to generate condition string
+*/
+  private[flink] def analyzeJoinCondition(
--- End diff --

I don't think we need this method. We can analyze the condition with 
Calcite's `JoinInfo`. Calcite's validation checks before optimization and 
translation that the types of the conditions are OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115866207
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
+
+val valueStateDescriptor2: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+timerState2 = 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115873251
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
+
+val valueStateDescriptor2: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+timerState2 = 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115868597
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115874115
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FilterRunner.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.functions.{FilterFunction, 
RichFilterFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.slf4j.LoggerFactory
+
+class FilterRunner[IN] (
--- End diff --

Actually, we could use a `JoinFunction` instead of a `FilterFunction` as 
well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115796448
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+
+class DataStreamJoinRule
+  extends ConverterRule(
+  classOf[FlinkLogicalJoin],
+  FlinkConventions.LOGICAL,
+  FlinkConventions.DATASTREAM,
+  "DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+
+val joinInfo = join.analyzeCondition
+
+// joins require an equi-condition or a conjunctive predicate with at 
least one equi-condition
+// and disable outer joins with non-equality predicates
+!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == 
JoinRelType.INNER)
--- End diff --

Is this condition correct? Are outer joins (incl. LEFT, RIGHT, FULL OUTER) 
supported if the join is an equality join (all conjunctive predicates are 
equality predicates)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115874289
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -698,6 +698,12 @@ class CodeGenerator(
   s"void join(Object _in1, Object _in2, 
org.apache.flink.util.Collector $collectorTerm)",
   List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
+  } else if (clazz == classOf[FilterFunction[_]]) {
--- End diff --

Not needed if we use `JoinFunction` instead of `FilterFunction`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115866096
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
+
+val valueStateDescriptor2: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+timerState2 = 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115864221
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
+|| right.isInstanceOf[StreamTableSourceScan]) {
+  throw new TableException(
+"Join between stream and table is not supported yet.")
+}
+// analyze time boundary and time predicate type(proctime/rowtime)
+val (timeType, leftStreamWindowSize, rightStreamWindowSize, 
conditionWithoutTime) =
+  JoinUtil.analyzeTimeBoundary(
+otherCondition,
+leftSchema.logicalType.getFieldCount,
+leftSchema.physicalType.getFieldCount,
+schema.logicalType,
+joinNode.getCluster.getRexBuilder,
+config)
+
+val leftDataStream = 
left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+val rightDataStream = 
right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+// generate other condition filter function
+val filterFunction =
--- 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115864005
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FilterRunner.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.functions.{FilterFunction, 
RichFilterFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.slf4j.LoggerFactory
+
+class FilterRunner[IN] (
--- End diff --

I don't think we need this additional wrapper. We could just pass the 
generated code of the `FilterFunction` to the `CoProcessFunction` and compile 
it in its open() method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115852606
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
--- End diff --

I don't think we need this restriction. A `StreamTableSourceScan` produces 
a stream not a table.
So by this we forbid regular stream-stream joins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115873581
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
+
+val valueStateDescriptor2: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+timerState2 = 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115845838
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+
+class DataStreamJoinRule
+  extends ConverterRule(
+  classOf[FlinkLogicalJoin],
+  FlinkConventions.LOGICAL,
+  FlinkConventions.DATASTREAM,
+  "DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+
+val joinInfo = join.analyzeCondition
+
+// joins require an equi-condition or a conjunctive predicate with at 
least one equi-condition
+// and disable outer joins with non-equality predicates
+!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == 
JoinRelType.INNER)
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+
+val join: FlinkLogicalJoin = rel.asInstanceOf[FlinkLogicalJoin]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
FlinkConventions.DATASTREAM)
+val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
FlinkConventions.DATASTREAM)
+
+new DataStreamJoin(
+  rel.getCluster,
+  traitSet,
+  convLeft,
+  convRight,
+  join,
--- End diff --

please pass join condition and join type instead of the `LogicalFlinkJoin`. 
This is a plan node of the logical plan that should not be part of a node in 
the physical plan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115871238
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
+
+val valueStateDescriptor2: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+timerState2 = 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115872865
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
--- End diff --

what are the timer states used for? I only see one `value()` call to get 
the value which checks `== 0` if a timer is registered. Can we make this a 
boolean state then? 
What would happen if we would not have this state? Would there be more 
timers (timers are unique by time).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115844805
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+
+class DataStreamJoinRule
+  extends ConverterRule(
+  classOf[FlinkLogicalJoin],
+  FlinkConventions.LOGICAL,
+  FlinkConventions.DATASTREAM,
+  "DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+
+val joinInfo = join.analyzeCondition
+
+// joins require an equi-condition or a conjunctive predicate with at 
least one equi-condition
+// and disable outer joins with non-equality predicates
+!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == 
JoinRelType.INNER)
--- End diff --

I assume not, because the join type is not passed on to the 
`DataStreamJoin`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115852788
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
+|| right.isInstanceOf[StreamTableSourceScan]) {
+  throw new TableException(
+"Join between stream and table is not supported yet.")
+}
+// analyze time boundary and time predicate type(proctime/rowtime)
+val (timeType, leftStreamWindowSize, rightStreamWindowSize, 
conditionWithoutTime) =
+  JoinUtil.analyzeTimeBoundary(
+otherCondition,
+leftSchema.logicalType.getFieldCount,
+leftSchema.physicalType.getFieldCount,
+schema.logicalType,
+joinNode.getCluster.getRexBuilder,
+config)
+
+val leftDataStream = 
left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+val rightDataStream = 
right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+// generate other condition filter function
+val filterFunction =
+  

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115864982
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
--- End diff --

I think we can use `TypeInformation[Row]` here and don't need the `CRow` 
wrapper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115865585
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
+
+val valueStateDescriptor2: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+timerState2 = 

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115847719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
--- End diff --

please do not include a `FlinkLogicalJoin` node but the condition and the 
join type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115874222
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
+
+if (left.isInstanceOf[StreamTableSourceScan]
+|| right.isInstanceOf[StreamTableSourceScan]) {
+  throw new TableException(
+"Join between stream and table is not supported yet.")
+}
+// analyze time boundary and time predicate type(proctime/rowtime)
+val (timeType, leftStreamWindowSize, rightStreamWindowSize, 
conditionWithoutTime) =
+  JoinUtil.analyzeTimeBoundary(
+otherCondition,
+leftSchema.logicalType.getFieldCount,
+leftSchema.physicalType.getFieldCount,
+schema.logicalType,
+joinNode.getCluster.getRexBuilder,
+config)
+
+val leftDataStream = 
left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+val rightDataStream = 
right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+// generate other condition filter function
+val filterFunction =
+  

[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115847866
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
--- End diff --

why don't we need the `rightSchema`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115850396
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamJoin(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   leftNode: RelNode,
+   rightNode: RelNode,
+   joinNode: FlinkLogicalJoin,
+   leftSchema: RowSchema,
+   schema: RowSchema,
+   ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinNode,
+  leftSchema,
+  schema,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+
+s"${joinTypeToString(joinNode.getJoinType)}" +
+  s"(condition: (${joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString)}), " +
+  s"select: (${joinSelectionToString(schema.logicalType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("condition", joinConditionToString(schema.logicalType,
+joinNode.getCondition, getExpressionString))
+  .item("select", joinSelectionToString(schema.logicalType))
+  .item("joinType", joinTypeToString(joinNode.getJoinType))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+// get the equality keys and other condition
+val (leftKeys, rightKeys, otherCondition) =
+  JoinUtil.analyzeJoinCondition(joinNode, getExpressionString)
--- End diff --

This can be done by 
```
val joinInfo = JoinInfo.of(leftNode, rightNode, condition)
val leftKeys: Array[Int] = joinInfo.leftKeys.toIntArray
val rightKeys: Array[Int] = joinInfo.rightKeys.toIntArray
val otherCondition = joinInfo.getRemaining(cluster.getRexBuilder)
``` 
So we do not need a special method for this. The type checks are not 
required, because Calcite will make sure during validation that only compatible 
types are compared. So we can be sure that types are valid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...

2017-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3715#discussion_r115875501
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftStreamWindowSizethe left stream window size
+  * @param rightStreamWindowSizethe right stream window size
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param filterFuncthe function of other non-equi condition include 
time condition
+  *
+  */
+class ProcTimeInnerJoin(
+  private val leftStreamWindowSize: Long,
+  private val rightStreamWindowSize: Long,
+  private val element1Type: TypeInformation[CRow],
+  private val element2Type: TypeInformation[CRow],
+  private val filterFunc: RichFilterFunction[Row])
+  extends CoProcessFunction[CRow, CRow, CRow] {
+
+  private var outputC: CRow = _
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+
+  override def open(config: Configuration) {
+outputC = new CRow(new Row(element1Type.getArity + 
element2Type.getArity), true)
+filterFunc.setRuntimeContext(getRuntimeContext)
+filterFunc.open(config)
+
+listToRemove = new util.ArrayList[Long]()
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] =
+new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+val rowListTypeInfo2: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType)
+val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+// initialize timer state
+val valueStateDescriptor1: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
+
+val valueStateDescriptor2: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+timerState2 = 

[jira] [Comment Edited] (FLINK-5536) Config option: HA

2017-05-10 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005585#comment-16005585
 ] 

Stavros Kontopoulos edited comment on FLINK-5536 at 5/10/17 10:43 PM:
--

[~eronwright]
https://github.com/mesosphere/dcos-flink-service/pull/23
https://github.com/mesosphere/universe/pull/1163


was (Author: skonto):
https://github.com/mesosphere/dcos-flink-service/pull/23
https://github.com/mesosphere/universe/pull/1163

> Config option: HA
> -
>
> Key: FLINK-5536
> URL: https://issues.apache.org/jira/browse/FLINK-5536
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos
>Reporter: Eron Wright 
>Assignee: Stavros Kontopoulos
>
> Configure Flink HA thru package options plus good defaults.   The main 
> components are ZK configuration and state backend configuration.
> - The ZK information can be defaulted to `master.mesos` as with other packages
> - Evaluate whether ZK can be fully configured by default, even if a state 
> backend isn't configured.
> - Use DCOS HDFS as the filesystem for the state backend.  Evaluate whether to 
> assume that DCOS HDFS is installed by default, or whether to make it explicit.
> - To use DCOS HDFS, the init script should download the core-site.xml and 
> hdfs-site.xml from the HDFS 'connection' endpoint.   Supply a default value 
> for the endpoint address; see 
> [https://docs.mesosphere.com/service-docs/hdfs/connecting-clients/].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5536) Config option: HA

2017-05-10 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005585#comment-16005585
 ] 

Stavros Kontopoulos commented on FLINK-5536:


https://github.com/mesosphere/dcos-flink-service/pull/23
https://github.com/mesosphere/universe/pull/1163

> Config option: HA
> -
>
> Key: FLINK-5536
> URL: https://issues.apache.org/jira/browse/FLINK-5536
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos
>Reporter: Eron Wright 
>Assignee: Stavros Kontopoulos
>
> Configure Flink HA thru package options plus good defaults.   The main 
> components are ZK configuration and state backend configuration.
> - The ZK information can be defaulted to `master.mesos` as with other packages
> - Evaluate whether ZK can be fully configured by default, even if a state 
> backend isn't configured.
> - Use DCOS HDFS as the filesystem for the state backend.  Evaluate whether to 
> assume that DCOS HDFS is installed by default, or whether to make it explicit.
> - To use DCOS HDFS, the init script should download the core-site.xml and 
> hdfs-site.xml from the HDFS 'connection' endpoint.   Supply a default value 
> for the endpoint address; see 
> [https://docs.mesosphere.com/service-docs/hdfs/connecting-clients/].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005509#comment-16005509
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r115861002
  
--- Diff: flink-connectors/flink-connector-cassandra/pom.xml ---
@@ -176,5 +176,23 @@ under the License.



+   
+   org.apache.flink
+   flink-table_2.10
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-scala_2.10
--- End diff --

This is interesting as now `flink-table_2.10` no longer pulls in 
`flink-streaming-scala_2.10` as a transitive dependency. Therefore the change 
needs to introduce the dependency in order to compile anything against the 
`flink-streaming-scala` module (e.g., `StreamExecutionEnviornment`).


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-05-10 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r115861002
  
--- Diff: flink-connectors/flink-connector-cassandra/pom.xml ---
@@ -176,5 +176,23 @@ under the License.



+   
+   org.apache.flink
+   flink-table_2.10
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-scala_2.10
--- End diff --

This is interesting as now `flink-table_2.10` no longer pulls in 
`flink-streaming-scala_2.10` as a transitive dependency. Therefore the change 
needs to introduce the dependency in order to compile anything against the 
`flink-streaming-scala` module (e.g., `StreamExecutionEnviornment`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005494#comment-16005494
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r115859400
  
--- Diff: flink-connectors/flink-connector-cassandra/pom.xml ---
@@ -176,5 +176,23 @@ under the License.



+   
+   org.apache.flink
+   flink-table_2.10
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-scala_2.10
--- End diff --

Do we actually depend on the scala dependencies?


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-05-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r115859400
  
--- Diff: flink-connectors/flink-connector-cassandra/pom.xml ---
@@ -176,5 +176,23 @@ under the License.



+   
+   org.apache.flink
+   flink-table_2.10
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-scala_2.10
--- End diff --

Do we actually depend on the scala dependencies?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6531) Deserialize checkpoint hooks with user classloader

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005401#comment-16005401
 ] 

ASF GitHub Bot commented on FLINK-6531:
---

Github user EronWright closed the pull request at:

https://github.com/apache/flink/pull/3867


> Deserialize checkpoint hooks with user classloader
> --
>
> Key: FLINK-6531
> URL: https://issues.apache.org/jira/browse/FLINK-6531
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The checkpoint hooks introduced in FLINK-6390 aren't being deserialized with 
> the user classloader, breaking remote execution.
> Remote execution produces a `ClassNotFoundException` as the job graph is 
> transferred from the client to the JobManager.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3867: [FLINK-6531] Deserialize checkpoint hooks with use...

2017-05-10 Thread EronWright
Github user EronWright closed the pull request at:

https://github.com/apache/flink/pull/3867


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3868: [FLINK-6532] [checkpoints] Ensure proper classloading for...

2017-05-10 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/3868
  
The title should refer to FLINK-6531


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6531) Deserialize checkpoint hooks with user classloader

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005364#comment-16005364
 ] 

ASF GitHub Bot commented on FLINK-6531:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3867
  
@EronWright Here is a slightly modified version of your code: 
https://github.com/apache/flink/pull/3868

I added a small test to also validate the class loading.


> Deserialize checkpoint hooks with user classloader
> --
>
> Key: FLINK-6531
> URL: https://issues.apache.org/jira/browse/FLINK-6531
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The checkpoint hooks introduced in FLINK-6390 aren't being deserialized with 
> the user classloader, breaking remote execution.
> Remote execution produces a `ClassNotFoundException` as the job graph is 
> transferred from the client to the JobManager.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3867: [FLINK-6531] Deserialize checkpoint hooks with user class...

2017-05-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3867
  
@EronWright Here is a slightly modified version of your code: 
https://github.com/apache/flink/pull/3868

I added a small test to also validate the class loading.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3868: [FLINK-6532] [checkpoints] Ensure proper classload...

2017-05-10 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/3868

[FLINK-6532] [checkpoints] Ensure proper classloading for user-defined 
checkpoint hooks

This also adds a test that validates that the correct class loader is 
passed.

Note the neat trick via 
`CommonTestUtils.createObjectForClassNotInClassPath(classLoader)` to conjure up 
an object that is not in the test's class path.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink hook_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3868.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3868


commit 8f6e7ab9f1fa96c24dff44f75551fec968d657c6
Author: Stephan Ewen 
Date:   2017-05-10T20:03:49Z

[FLINK-6532] [checkpoints] Ensure proper classloading for user-defined 
checkpoint hooks




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6529) Rework the shading model in Flink

2017-05-10 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005350#comment-16005350
 ] 

Haohui Mai commented on FLINK-6529:
---

That makes sense as we are being hit by this problem quite hard (especially 
guava). I can take this up.

> Rework the shading model in Flink
> -
>
> Key: FLINK-6529
> URL: https://issues.apache.org/jira/browse/FLINK-6529
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Stephan Ewen
>Assignee: Haohui Mai
>Priority: Critical
>
> h2. Problem
> Currently, Flink shades dependencies like ASM and Guava into all jars of 
> projects that reference it and relocate the classes.
> There are some drawbacks to that approach, let's discuss them at the example 
> of ASM:
>   - The ASM classes are for example in {{flink-core}}, {{flink-java}}, 
> {{flink-scala}}, {{flink-runtime}}, etc.
>   - Users that reference these dependencies have the classes multiple times 
> in the classpath. That is unclean (works, through, because the classes are 
> identical). The same happens when building the final dist. jar.
>   - Some of these dependencies require to include license files in the shaded 
> jar. It is hard to impossible to build a good automatic solution for that, 
> partly due to Maven's very poor cross-project path support
>   - Scala does not support shading really well. Scala classes have references 
> to classes in more places than just the class names (apparently for Scala 
> reflect support). Referencing a Scala project with shaded ASM still requires 
> to add a reference to unshaded ASM (at least as a compile dependency).
> h2. Proposal
> I propose that we build and deploy a {{asm-flink-shaded}} version of ASM and 
> directly program against the relocated namespaces. Since we never use classes 
> that we relocate in public interfaces, Flink users will never see the 
> relocated class names. Internally, it does not hurt to use them.
>   - Proper maven dependency management, no hidden (shaded) dependencies
>   - one copy of each dependency
>   - proper Scala interoperability
>   - no clumsy license management (license is in the deployed 
> {{asm-flink-shaded}})



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6529) Rework the shading model in Flink

2017-05-10 Thread Haohui Mai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haohui Mai reassigned FLINK-6529:
-

Assignee: Haohui Mai

> Rework the shading model in Flink
> -
>
> Key: FLINK-6529
> URL: https://issues.apache.org/jira/browse/FLINK-6529
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Stephan Ewen
>Assignee: Haohui Mai
>Priority: Critical
>
> h2. Problem
> Currently, Flink shades dependencies like ASM and Guava into all jars of 
> projects that reference it and relocate the classes.
> There are some drawbacks to that approach, let's discuss them at the example 
> of ASM:
>   - The ASM classes are for example in {{flink-core}}, {{flink-java}}, 
> {{flink-scala}}, {{flink-runtime}}, etc.
>   - Users that reference these dependencies have the classes multiple times 
> in the classpath. That is unclean (works, through, because the classes are 
> identical). The same happens when building the final dist. jar.
>   - Some of these dependencies require to include license files in the shaded 
> jar. It is hard to impossible to build a good automatic solution for that, 
> partly due to Maven's very poor cross-project path support
>   - Scala does not support shading really well. Scala classes have references 
> to classes in more places than just the class names (apparently for Scala 
> reflect support). Referencing a Scala project with shaded ASM still requires 
> to add a reference to unshaded ASM (at least as a compile dependency).
> h2. Proposal
> I propose that we build and deploy a {{asm-flink-shaded}} version of ASM and 
> directly program against the relocated namespaces. Since we never use classes 
> that we relocate in public interfaces, Flink users will never see the 
> relocated class names. Internally, it does not hurt to use them.
>   - Proper maven dependency management, no hidden (shaded) dependencies
>   - one copy of each dependency
>   - proper Scala interoperability
>   - no clumsy license management (license is in the deployed 
> {{asm-flink-shaded}})



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005325#comment-16005325
 ] 

ASF GitHub Bot commented on FLINK-4499:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/2422
  
Hadoop has switched to spotbugs


> Introduce findbugs maven plugin
> ---
>
> Key: FLINK-4499
> URL: https://issues.apache.org/jira/browse/FLINK-4499
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Suneel Marthi
>
> As suggested by Stephan in FLINK-4482, this issue is to add 
> findbugs-maven-plugin into the build process so that we can detect lack of 
> proper locking and other defects automatically.
> We can begin with small set of rules.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2422: FLINK-4499: [WIP] Introduce findbugs maven plugin

2017-05-10 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/2422
  
Hadoop has switched to spotbugs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005283#comment-16005283
 ] 

ASF GitHub Bot commented on FLINK-4499:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2422
  
We would need INFRA to enable a daily cron build for Flink. How will we use 
FindBugs (which may be dead)? Is a goal to export the site artifacts to S3?


> Introduce findbugs maven plugin
> ---
>
> Key: FLINK-4499
> URL: https://issues.apache.org/jira/browse/FLINK-4499
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Suneel Marthi
>
> As suggested by Stephan in FLINK-4482, this issue is to add 
> findbugs-maven-plugin into the build process so that we can detect lack of 
> proper locking and other defects automatically.
> We can begin with small set of rules.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2422: FLINK-4499: [WIP] Introduce findbugs maven plugin

2017-05-10 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2422
  
We would need INFRA to enable a daily cron build for Flink. How will we use 
FindBugs (which may be dead)? Is a goal to export the site artifacts to S3?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6531) Deserialize checkpoint hooks with user classloader

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005281#comment-16005281
 ] 

ASF GitHub Bot commented on FLINK-6531:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3867
  
Thanks for this patch, this is crucial!

There is a slightly different approach I would suggest, to not assume that 
the `Factory` is always in the core classpath. I have the code for that almost 
ready, plus a test. Can open a PR for your review shortly...


> Deserialize checkpoint hooks with user classloader
> --
>
> Key: FLINK-6531
> URL: https://issues.apache.org/jira/browse/FLINK-6531
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The checkpoint hooks introduced in FLINK-6390 aren't being deserialized with 
> the user classloader, breaking remote execution.
> Remote execution produces a `ClassNotFoundException` as the job graph is 
> transferred from the client to the JobManager.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3867: [FLINK-6531] Deserialize checkpoint hooks with user class...

2017-05-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3867
  
Thanks for this patch, this is crucial!

There is a slightly different approach I would suggest, to not assume that 
the `Factory` is always in the core classpath. I have the code for that almost 
ready, plus a test. Can open a PR for your review shortly...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005279#comment-16005279
 ] 

ASF GitHub Bot commented on FLINK-6075:
---

Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3714
  
@fhueske As at some point there were some other issues open on this i just 
wanted to ping you on the right :)



> Support Limit/Top(Sort) for Stream SQL
> --
>
> Key: FLINK-6075
> URL: https://issues.apache.org/jira/browse/FLINK-6075
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>  Labels: features
> Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the 
> same only the processing function differs in terms of the output. Hence, the 
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL 
> '3' HOUR) ORDER BY b` 
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL 
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING 
> LIMIT 10) FROM stream1`  
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections 
> of data). 
> -Each of the 3 operators will be supported with each of the types of 
> expressing the windows. 
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all 
> require a sorted collection of the data on which the logic will be applied 
> (i.e., select a subset of the items or the entire sorted set). These 
> functions would make sense in the streaming context only in the context of a 
> window. Without defining a window the functions could never emit as the sort 
> operation would never trigger. If an SQL query will be provided without 
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). 
> Although not targeted by this JIRA, in the case of working based on event 
> time order, the retraction mechanisms of windows and the lateness mechanisms 
> can be used to deal with out of order events and retraction/updates of 
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting, 
> limit and top). Rowtime indicates when the HOP window will trigger – which 
> can be observed in the fact that outputs are generated only at those moments. 
> The HOP windows will trigger at every hour (fixed hour) and each event will 
> contribute/ be duplicated for 2 consecutive hour intervals. Proctime 
> indicates the processing time when a new event arrives in the system. Events 
> are of the type (a,b) with the ordering being applied on the b field.
> `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) 
> ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `)
> ||Rowtime||   Proctime||  Stream1||   Limit 2||   Top 2|| Sort 
> [ASC]||
> | |10:00:00  |(aaa, 11)   |   | | 
>|
> | |10:05:00|(aab, 7)  |   | ||
> |10-11  |11:00:00  |  |   aab,aaa |aab,aaa  | aab,aaa 
>|
> | |11:03:00  |(aac,21)  |   | ||  
> 
> |11-12|12:00:00  |  | aab,aaa |aab,aaa  | aab,aaa,aac|
> | |12:10:00  |(abb,12)  |   | ||  
> 
> | |12:15:00  |(abb,12)  |   | ||  
> 
> |12-13  |13:00:00  |  |   abb,abb | abb,abb | 
> abb,abb,aac|
> |...|
> **Implementation option**
> Considering that the SQL operators will be associated with window boundaries, 
> the functionality will be implemented within the logic of the window as 
> follows.
> * Window assigner – selected based on the type of window used in SQL 
> (TUMBLING, SLIDING…)
> * Evictor/ Trigger – time or count evictor based on the definition of the 
> window boundaries
> * Apply – window function that sorts data and selects the output to trigger 
> (based on LIMIT/TOP parameters). All data will be sorted at once and result 
> outputted when the window is triggered
> An alternative implementation can be to use a fold window function to sort 
> the elements as they arrive, one at a time followed by a flatMap 

[GitHub] flink issue #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream SQL

2017-05-10 Thread rtudoran
Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3714
  
@fhueske As at some point there were some other issues open on this i just 
wanted to ping you on the right :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >