[jira] [Commented] (FLINK-6491) Add QueryConfig to specify state retention time for streaming queries
[ https://issues.apache.org/jira/browse/FLINK-6491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005933#comment-16005933 ] ASF GitHub Bot commented on FLINK-6491: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3863 Hi @fhueske thanks a lot for your reviewing. I had update the PR. with following changes: * Uniform hump name, change all `qConfig` and `qConf` to `queryConfig`. * Add warn log into `DataStreamOverAggregate`. * Fix cleanup logic * Extend test cases that cover the corner cases of the clean up logic. But there are two things I have not do, that is: * `check for processing time over range windows that minIdleStateRetentionTime > preceding interval.` I do not think we need add this check. Because both row-base and time-base will be able to meet the incorrect result due to state cleanup. an importation thing we need to do is let use know the function of the cleanup config. * `remove the registerEventCleanupTimer method` The reason of I add this method is In A row-time OVER (TimeDomain.EVENT_TIME), we always get 0 when we call ctx.timerService.currentProcessingTime. What do you think? Thanks, SunJincheng > Add QueryConfig to specify state retention time for streaming queries > - > > Key: FLINK-6491 > URL: https://issues.apache.org/jira/browse/FLINK-6491 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: sunjincheng >Priority: Critical > > By now we have a couple of streaming operators (group-windows, over-windows, > non-windowed aggregations) that require operator state. Since state is not > automatically cleaned-up by Flink, we need to add a mechanism to configure a > state retention time. > If configured, a query will retain state for a specified period of state > inactivity. If state is not accessed within this period of time, it will be > cleared. I propose to add two parameters for this, a min and a max retention > time. The min retention time specifies the earliest time and the max > retention time the latest time when state is cleared. The reasoning for > having two parameters is that we can avoid to register many timers if we have > more freedom when to discard state. > This issue also introduces a QueryConfig object which can be passed to a > streaming query, when it is emitted to a TableSink or converted to a > DataStream (append or retraction). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3863: [FLINK-6491][talbe]Add QueryConfig to specify state reten...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3863 Hi @fhueske thanks a lot for your reviewing. I had update the PR. with following changes: * Uniform hump name, change all `qConfig` and `qConf` to `queryConfig`. * Add warn log into `DataStreamOverAggregate`. * Fix cleanup logic * Extend test cases that cover the corner cases of the clean up logic. But there are two things I have not do, that is: * `check for processing time over range windows that minIdleStateRetentionTime > preceding interval.` I do not think we need add this check. Because both row-base and time-base will be able to meet the incorrect result due to state cleanup. an importation thing we need to do is let use know the function of the cleanup config. * `remove the registerEventCleanupTimer method` The reason of I add this method is In A row-time OVER (TimeDomain.EVENT_TIME), we always get 0 when we call ctx.timerService.currentProcessingTime. What do you think? Thanks, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6491) Add QueryConfig to specify state retention time for streaming queries
[ https://issues.apache.org/jira/browse/FLINK-6491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005922#comment-16005922 ] ASF GitHub Bot commented on FLINK-6491: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115908788 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -239,6 +248,7 @@ class DataStreamOverAggregate( } def createBoundedAndCurrentRowOverWindow( --- End diff -- I do not think we need add this check. Because both `row-base` and `time-base` will be able to meet the incorrect result due to state cleanup. an importation thing we need to do is let use know the function of the `cleanup config`. > Add QueryConfig to specify state retention time for streaming queries > - > > Key: FLINK-6491 > URL: https://issues.apache.org/jira/browse/FLINK-6491 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: sunjincheng >Priority: Critical > > By now we have a couple of streaming operators (group-windows, over-windows, > non-windowed aggregations) that require operator state. Since state is not > automatically cleaned-up by Flink, we need to add a mechanism to configure a > state retention time. > If configured, a query will retain state for a specified period of state > inactivity. If state is not accessed within this period of time, it will be > cleared. I propose to add two parameters for this, a min and a max retention > time. The min retention time specifies the earliest time and the max > retention time the latest time when state is cleared. The reasoning for > having two parameters is that we can avoid to register many timers if we have > more freedom when to discard state. > This issue also introduces a QueryConfig object which can be passed to a > streaming query, when it is emitted to a TableSink or converted to a > DataStream (append or retraction). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3863: [FLINK-6491][talbe]Add QueryConfig to specify stat...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115908788 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -239,6 +248,7 @@ class DataStreamOverAggregate( } def createBoundedAndCurrentRowOverWindow( --- End diff -- I do not think we need add this check. Because both `row-base` and `time-base` will be able to meet the incorrect result due to state cleanup. an importation thing we need to do is let use know the function of the `cleanup config`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6491) Add QueryConfig to specify state retention time for streaming queries
[ https://issues.apache.org/jira/browse/FLINK-6491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005913#comment-16005913 ] ASF GitHub Bot commented on FLINK-6491: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115907975 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.state.State +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} + +abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig: StreamQueryConfig) + extends ProcessFunction[IN, OUT]{ + + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1 + // interval in which clean-up timers are registered + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected def initCleanupTimeState(stateName: String) { +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + } + + protected def registerProcessingCleanupTimer( +ctx: ProcessFunction[IN, OUT]#Context, +currentTime: Long): Unit = { +if (stateCleaningEnabled) { + + val earliestCleanup = currentTime + minRetentionTime + + // last registered timer + val lastCleanupTime = cleanupTimeState.value() + + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { +// we need to register a new timer +val cleanupTime = earliestCleanup + cleanupTimerInterval +// register timer and remember clean-up time +ctx.timerService().registerProcessingTimeTimer(cleanupTime) +cleanupTimeState.update(cleanupTime) + } +} + } + protected def registerEventCleanupTimer( --- End diff -- The reason of I add this method is In A `row-time` OVER (TimeDomain.EVENT_TIME), we always get `0` when we call `ctx.timerService.currentProcessingTime`. > Add QueryConfig to specify state retention time for streaming queries > - > > Key: FLINK-6491 > URL: https://issues.apache.org/jira/browse/FLINK-6491 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: sunjincheng >Priority: Critical > > By now we have a couple of streaming operators (group-windows, over-windows, > non-windowed aggregations) that require operator state. Since state is not > automatically cleaned-up by Flink, we need to add a mechanism to configure a > state retention time. > If configured, a query will retain state for a specified period of state > inactivity. If state is not accessed within this period of time, it will be > cleared. I propose to add two parameters for this, a min and a max retention > time. The min retention time specifies the earliest time and the max > retention time the latest time when state is cleared. The reasoning for > having two parameters is that we can avoid to register
[GitHub] flink pull request #3863: [FLINK-6491][talbe]Add QueryConfig to specify stat...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115907975 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.state.State +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} + +abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig: StreamQueryConfig) + extends ProcessFunction[IN, OUT]{ + + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1 + // interval in which clean-up timers are registered + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected def initCleanupTimeState(stateName: String) { +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + } + + protected def registerProcessingCleanupTimer( +ctx: ProcessFunction[IN, OUT]#Context, +currentTime: Long): Unit = { +if (stateCleaningEnabled) { + + val earliestCleanup = currentTime + minRetentionTime + + // last registered timer + val lastCleanupTime = cleanupTimeState.value() + + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { +// we need to register a new timer +val cleanupTime = earliestCleanup + cleanupTimerInterval +// register timer and remember clean-up time +ctx.timerService().registerProcessingTimeTimer(cleanupTime) +cleanupTimeState.update(cleanupTime) + } +} + } + protected def registerEventCleanupTimer( --- End diff -- The reason of I add this method is In A `row-time` OVER (TimeDomain.EVENT_TIME), we always get `0` when we call `ctx.timerService.currentProcessingTime`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6532) Mesos version check
Eron Wright created FLINK-6532: --- Summary: Mesos version check Key: FLINK-6532 URL: https://issues.apache.org/jira/browse/FLINK-6532 Project: Flink Issue Type: Improvement Components: Mesos Reporter: Eron Wright The minimum requirement for the Mesos subsystem of Flink is 1.0. We should enforce the requirement with a version check upon connection. This may be accomplished by checking the 'version' property of the 'MesosInfo' structure. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6502) Add support ElasticsearchSink for DataSet
[ https://issues.apache.org/jira/browse/FLINK-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005806#comment-16005806 ] ASF GitHub Bot commented on FLINK-6502: --- GitHub user 397090770 opened a pull request: https://github.com/apache/flink/pull/3869 [FLINK-6502] Add support ElasticsearchSink for DataSet Currently, Flink only support writing data in `DataStream` to ElasticSearch through `ElasticsearchSink`, it will be very useful if Flink internal support writing data in `DataSet` to ElasticSearch. See [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html) - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/397090770/flink FLINK-6502 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3869.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3869 commit a58ce4bcd86aae000c8308a66ca5c74a1812fb46 Author: yangping.wuDate: 2017-05-11T02:14:03Z [FLINK-6502] Add support ElasticsearchSink for DataSet > Add support ElasticsearchSink for DataSet > - > > Key: FLINK-6502 > URL: https://issues.apache.org/jira/browse/FLINK-6502 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.2.1 >Reporter: wyp > > Currently, Flink only support writing data in DataStream to ElasticSearch > through {{ElasticsearchSink}}, We should be able to writing data in DataSet > to ElasticSearch too. see > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3869: [FLINK-6502] Add support ElasticsearchSink for Dat...
GitHub user 397090770 opened a pull request: https://github.com/apache/flink/pull/3869 [FLINK-6502] Add support ElasticsearchSink for DataSet Currently, Flink only support writing data in `DataStream` to ElasticSearch through `ElasticsearchSink`, it will be very useful if Flink internal support writing data in `DataSet` to ElasticSearch. See [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html) - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/397090770/flink FLINK-6502 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3869.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3869 commit a58ce4bcd86aae000c8308a66ca5c74a1812fb46 Author: yangping.wuDate: 2017-05-11T02:14:03Z [FLINK-6502] Add support ElasticsearchSink for DataSet --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6502) Add Support ElasticsearchSink for DataSet
[ https://issues.apache.org/jira/browse/FLINK-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wyp updated FLINK-6502: --- Summary: Add Support ElasticsearchSink for DataSet (was: Support writing data in DataSet to ElasticSearch) > Add Support ElasticsearchSink for DataSet > - > > Key: FLINK-6502 > URL: https://issues.apache.org/jira/browse/FLINK-6502 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.2.1 >Reporter: wyp > > Currently, Flink only support writing data in DataStream to ElasticSearch > through {{ElasticsearchSink}}, We should be able to writing data in DataSet > to ElasticSearch too. see > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6502) Add support ElasticsearchSink for DataSet
[ https://issues.apache.org/jira/browse/FLINK-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wyp updated FLINK-6502: --- Summary: Add support ElasticsearchSink for DataSet (was: Add Support ElasticsearchSink for DataSet) > Add support ElasticsearchSink for DataSet > - > > Key: FLINK-6502 > URL: https://issues.apache.org/jira/browse/FLINK-6502 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.2.1 >Reporter: wyp > > Currently, Flink only support writing data in DataStream to ElasticSearch > through {{ElasticsearchSink}}, We should be able to writing data in DataSet > to ElasticSearch too. see > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ElasticsearchSink-on-DataSet-td12980.html] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6491) Add QueryConfig to specify state retention time for streaming queries
[ https://issues.apache.org/jira/browse/FLINK-6491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005751#comment-16005751 ] ASF GitHub Bot commented on FLINK-6491: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115888057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -81,6 +81,8 @@ abstract class StreamTableEnvironment( // the naming pattern for internally registered tables. private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r + def qConf: StreamQueryConfig = new StreamQueryConfig --- End diff -- Yes,I want change all `qConfig` and `qConf` to `queryConfig`. > Add QueryConfig to specify state retention time for streaming queries > - > > Key: FLINK-6491 > URL: https://issues.apache.org/jira/browse/FLINK-6491 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: sunjincheng >Priority: Critical > > By now we have a couple of streaming operators (group-windows, over-windows, > non-windowed aggregations) that require operator state. Since state is not > automatically cleaned-up by Flink, we need to add a mechanism to configure a > state retention time. > If configured, a query will retain state for a specified period of state > inactivity. If state is not accessed within this period of time, it will be > cleared. I propose to add two parameters for this, a min and a max retention > time. The min retention time specifies the earliest time and the max > retention time the latest time when state is cleared. The reasoning for > having two parameters is that we can avoid to register many timers if we have > more freedom when to discard state. > This issue also introduces a QueryConfig object which can be passed to a > streaming query, when it is emitted to a TableSink or converted to a > DataStream (append or retraction). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3863: [FLINK-6491][talbe]Add QueryConfig to specify stat...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115888057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -81,6 +81,8 @@ abstract class StreamTableEnvironment( // the naming pattern for internally registered tables. private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r + def qConf: StreamQueryConfig = new StreamQueryConfig --- End diff -- Yes,I want change all `qConfig` and `qConf` to `queryConfig`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-4534: -- Description: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entryentry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} was: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entry entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5376) Misleading log statements in UnorderedStreamElementQueue
[ https://issues.apache.org/jira/browse/FLINK-5376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5376: -- Description: The following are two examples where ordered stream element queue is mentioned: {code} LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return true; } else { LOG.debug("Failed to put element into ordered stream element queue because it " + {code} I guess OrderedStreamElementQueue was coded first. was: The following are two examples where ordered stream element queue is mentioned: {code} LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return true; } else { LOG.debug("Failed to put element into ordered stream element queue because it " + {code} I guess OrderedStreamElementQueue was coded first. > Misleading log statements in UnorderedStreamElementQueue > > > Key: FLINK-5376 > URL: https://issues.apache.org/jira/browse/FLINK-5376 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: Chesnay Schepler >Priority: Minor > > The following are two examples where ordered stream element queue is > mentioned: > {code} > LOG.debug("Put element into ordered stream element queue. New filling > degree " + > "({}/{}).", numberEntries, capacity); > return true; > } else { > LOG.debug("Failed to put element into ordered stream element queue > because it " + > {code} > I guess OrderedStreamElementQueue was coded first. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822346#comment-15822346 ] Ted Yu edited comment on FLINK-5486 at 5/11/17 1:04 AM: Lock on State.bucketStates should be held in the following method: {code} private void handleRestoredBucketState(State restoredState) { Preconditions.checkNotNull(restoredState); for (BucketState bucketState : restoredState.bucketStates.values()) { {code} was (Author: yuzhih...@gmail.com): Lock on State.bucketStates should be held in the following method: {code} private void handleRestoredBucketState(State restoredState) { Preconditions.checkNotNull(restoredState); for (BucketState bucketState : restoredState.bucketStates.values()) { {code} > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-6105: -- Description: When catching InterruptedException, we should throw InterruptedIOException instead of IOException. The following example is from HadoopInputFormatBase : {code} try { splits = this.mapreduceInputFormat.getSplits(jobContext); } catch (InterruptedException e) { throw new IOException("Could not get Splits.", e); } {code} There may be other places where IOE is thrown. was: When catching InterruptedException, we should throw InterruptedIOException instead of IOException. The following example is from HadoopInputFormatBase : {code} try { splits = this.mapreduceInputFormat.getSplits(jobContext); } catch (InterruptedException e) { throw new IOException("Could not get Splits.", e); } {code} There may be other places where IOE is thrown. > Properly handle InterruptedException in HadoopInputFormatBase > - > > Key: FLINK-6105 > URL: https://issues.apache.org/jira/browse/FLINK-6105 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu > > When catching InterruptedException, we should throw InterruptedIOException > instead of IOException. > The following example is from HadoopInputFormatBase : > {code} > try { > splits = this.mapreduceInputFormat.getSplits(jobContext); > } catch (InterruptedException e) { > throw new IOException("Could not get Splits.", e); > } > {code} > There may be other places where IOE is thrown. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()
[ https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5541: -- Description: {code} if (localJar == null) { try { for (final URL url : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment()) .getJars()) { // TODO verify that there is only one jar localJar = new File(url.toURI()).getAbsolutePath(); } } catch (final URISyntaxException e) { // ignore } catch (final ClassCastException e) { // ignore } } logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf); client.submitTopologyWithOpts(name, localJar, topology); {code} Since the try block may encounter URISyntaxException / ClassCastException, we should check that localJar is not null before calling submitTopologyWithOpts(). was: {code} if (localJar == null) { try { for (final URL url : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment()) .getJars()) { // TODO verify that there is only one jar localJar = new File(url.toURI()).getAbsolutePath(); } } catch (final URISyntaxException e) { // ignore } catch (final ClassCastException e) { // ignore } } logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf); client.submitTopologyWithOpts(name, localJar, topology); {code} Since the try block may encounter URISyntaxException / ClassCastException, we should check that localJar is not null before calling submitTopologyWithOpts(). > Missing null check for localJar in FlinkSubmitter#submitTopology() > -- > > Key: FLINK-5541 > URL: https://issues.apache.org/jira/browse/FLINK-5541 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Ted Yu >Priority: Minor > > {code} > if (localJar == null) { > try { > for (final URL url : ((ContextEnvironment) > ExecutionEnvironment.getExecutionEnvironment()) > .getJars()) { > // TODO verify that there is only one jar > localJar = new File(url.toURI()).getAbsolutePath(); > } > } catch (final URISyntaxException e) { > // ignore > } catch (final ClassCastException e) { > // ignore > } > } > logger.info("Submitting topology " + name + " in distributed mode with > conf " + serConf); > client.submitTopologyWithOpts(name, localJar, topology); > {code} > Since the try block may encounter URISyntaxException / ClassCastException, we > should check that localJar is not null before calling > submitTopologyWithOpts(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5488) yarnClient should be closed in AbstractYarnClusterDescriptor for error conditions
[ https://issues.apache.org/jira/browse/FLINK-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005742#comment-16005742 ] Ted Yu commented on FLINK-5488: --- I agree. > yarnClient should be closed in AbstractYarnClusterDescriptor for error > conditions > - > > Key: FLINK-5488 > URL: https://issues.apache.org/jira/browse/FLINK-5488 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Ted Yu >Assignee: Fang Yong > > Here is one example: > {code} > if(jobManagerMemoryMb > maxRes.getMemory() ) { > failSessionDuringDeployment(yarnClient, yarnApplication); > throw new YarnDeploymentException("The cluster does not have the > requested resources for the JobManager available!\n" > + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + > jobManagerMemoryMb + "MB. " + NOTE); > } > {code} > yarnClient implements Closeable. > It should be closed in situations where exception is thrown. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6332) Upgrade Scala version to 2.11.11
[ https://issues.apache.org/jira/browse/FLINK-6332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005740#comment-16005740 ] Ted Yu commented on FLINK-6332: --- We can wait for newer Scala release for more visible benefits. > Upgrade Scala version to 2.11.11 > > > Key: FLINK-6332 > URL: https://issues.apache.org/jira/browse/FLINK-6332 > Project: Flink > Issue Type: Improvement > Components: Build System, Scala API >Reporter: Ted Yu >Priority: Minor > > Currently scala-2.11 profile uses Scala 2.11.7 > 2.11.11 is the most recent version. > This issue is to upgrade to Scala 2.11.11 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6414) Use scala.binary.version in place of change-scala-version.sh
[ https://issues.apache.org/jira/browse/FLINK-6414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005717#comment-16005717 ] ASF GitHub Bot commented on FLINK-6414: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3800 @StephanEwen does the recent discussion on packaging shaded dependencies mean that `force-shading` will go away? Or do we keep that even if dependencies are no longer being shaded? Or will only some shaded dependencies (the most problematic) be provided? > Use scala.binary.version in place of change-scala-version.sh > > > Key: FLINK-6414 > URL: https://issues.apache.org/jira/browse/FLINK-6414 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > Recent commits have failed to modify {{change-scala-version.sh}} resulting in > broken builds for {{scala-2.11}}. It looks like we can remove the need for > this script by replacing hard-coded references to the Scala version with > Flink's maven variable {{scala.binary.version}}. > I had initially realized that the change script is [only used for > building|https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/building.html#scala-versions] > and not for switching the IDE environment. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3800: [FLINK-6414] [build] Use scala.binary.version in place of...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3800 @StephanEwen does the recent discussion on packaging shaded dependencies mean that `force-shading` will go away? Or do we keep that even if dependencies are no longer being shaded? Or will only some shaded dependencies (the most problematic) be provided? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6529) Rework the shading model in Flink
[ https://issues.apache.org/jira/browse/FLINK-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005710#comment-16005710 ] Greg Hogan commented on FLINK-6529: --- [~wheat9] appreciate your enthusiasm and just wanted to make sure you have followed the discussion on the mailing list. I believe these modules will go in a separate Flink repo so you'll want to coordinate with [~StephanEwen] especially if the goal is to have this in for 1.3. > Rework the shading model in Flink > - > > Key: FLINK-6529 > URL: https://issues.apache.org/jira/browse/FLINK-6529 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.0, 1.2.1 >Reporter: Stephan Ewen >Assignee: Haohui Mai >Priority: Critical > > h2. Problem > Currently, Flink shades dependencies like ASM and Guava into all jars of > projects that reference it and relocate the classes. > There are some drawbacks to that approach, let's discuss them at the example > of ASM: > - The ASM classes are for example in {{flink-core}}, {{flink-java}}, > {{flink-scala}}, {{flink-runtime}}, etc. > - Users that reference these dependencies have the classes multiple times > in the classpath. That is unclean (works, through, because the classes are > identical). The same happens when building the final dist. jar. > - Some of these dependencies require to include license files in the shaded > jar. It is hard to impossible to build a good automatic solution for that, > partly due to Maven's very poor cross-project path support > - Scala does not support shading really well. Scala classes have references > to classes in more places than just the class names (apparently for Scala > reflect support). Referencing a Scala project with shaded ASM still requires > to add a reference to unshaded ASM (at least as a compile dependency). > h2. Proposal > I propose that we build and deploy a {{asm-flink-shaded}} version of ASM and > directly program against the relocated namespaces. Since we never use classes > that we relocate in public interfaces, Flink users will never see the > relocated class names. Internally, it does not hurt to use them. > - Proper maven dependency management, no hidden (shaded) dependencies > - one copy of each dependency > - proper Scala interoperability > - no clumsy license management (license is in the deployed > {{asm-flink-shaded}}) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3866: A refactor to avoid cloned code in try-catch blocks (usin...
Github user rbonifacio commented on the issue: https://github.com/apache/flink/pull/3866 @greghogan I changed the files, introducing the missing spaces after catches. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005634#comment-16005634 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115871238 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005644#comment-16005644 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115874289 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -698,6 +698,12 @@ class CodeGenerator( s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)", List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;", s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;")) + } else if (clazz == classOf[FilterFunction[_]]) { --- End diff -- Not needed if we use `JoinFunction` instead of `FilterFunction` > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005645#comment-16005645 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115866096 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005649#comment-16005649 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115873645 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005641#comment-16005641 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115855446 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) + +if (left.isInstanceOf[StreamTableSourceScan] +|| right.isInstanceOf[StreamTableSourceScan]) { + throw new TableException( +"Join between stream and table is not supported yet.") +} +// analyze time boundary and time predicate type(proctime/rowtime) +val (timeType, leftStreamWindowSize, rightStreamWindowSize, conditionWithoutTime) = + JoinUtil.analyzeTimeBoundary( +otherCondition, +leftSchema.logicalType.getFieldCount, +leftSchema.physicalType.getFieldCount, +schema.logicalType, +joinNode.getCluster.getRexBuilder, +config) + +val leftDataStream =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005636#comment-16005636 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115864221 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) + +if (left.isInstanceOf[StreamTableSourceScan] +|| right.isInstanceOf[StreamTableSourceScan]) { + throw new TableException( +"Join between stream and table is not supported yet.") +} +// analyze time boundary and time predicate type(proctime/rowtime) +val (timeType, leftStreamWindowSize, rightStreamWindowSize, conditionWithoutTime) = + JoinUtil.analyzeTimeBoundary( +otherCondition, +leftSchema.logicalType.getFieldCount, +leftSchema.physicalType.getFieldCount, +schema.logicalType, +joinNode.getCluster.getRexBuilder, +config) + +val leftDataStream =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005632#comment-16005632 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115847719 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, --- End diff -- please do not include a `FlinkLogicalJoin` node but the condition and the join type. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005623#comment-16005623 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115845838 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin + +import org.apache.calcite.rel.core.JoinRelType +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema + +class DataStreamJoinRule + extends ConverterRule( + classOf[FlinkLogicalJoin], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] + +val joinInfo = join.analyzeCondition + +// joins require an equi-condition or a conjunctive predicate with at least one equi-condition +// and disable outer joins with non-equality predicates +!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER) + } + + override def convert(rel: RelNode): RelNode = { + +val join: FlinkLogicalJoin = rel.asInstanceOf[FlinkLogicalJoin] +val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) +val convLeft: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.DATASTREAM) +val convRight: RelNode = RelOptRule.convert(join.getInput(1), FlinkConventions.DATASTREAM) + +new DataStreamJoin( + rel.getCluster, + traitSet, + convLeft, + convRight, + join, --- End diff -- please pass join condition and join type instead of the `LogicalFlinkJoin`. This is a plan node of the logical plan that should not be part of a node in the physical plan. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005630#comment-16005630 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115852788 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) + +if (left.isInstanceOf[StreamTableSourceScan] +|| right.isInstanceOf[StreamTableSourceScan]) { + throw new TableException( +"Join between stream and table is not supported yet.") +} +// analyze time boundary and time predicate type(proctime/rowtime) +val (timeType, leftStreamWindowSize, rightStreamWindowSize, conditionWithoutTime) = + JoinUtil.analyzeTimeBoundary( +otherCondition, +leftSchema.logicalType.getFieldCount, +leftSchema.physicalType.getFieldCount, +schema.logicalType, +joinNode.getCluster.getRexBuilder, +config) + +val leftDataStream =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005631#comment-16005631 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115865585 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005633#comment-16005633 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115872865 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ --- End diff -- what are the timer states used for? I only see one `value()` call to get the value which checks `== 0` if a timer is registered. Can we make this a boolean state then? What would happen if we would not have this state? Would there be more timers (timers are unique by time). > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime >
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005646#comment-16005646 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115868597 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) --- End diff -- indent > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005637#comment-16005637 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115864982 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], --- End diff -- I think we can use `TypeInformation[Row]` here and don't need the `CRow` wrapper. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005642#comment-16005642 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115868097 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.runtime.FilterRunner +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze join condition to get equi-conditon and other condition +* @param joinNode logicaljoin node +* @param expression the function to generate condition string +*/ + private[flink] def analyzeJoinCondition( +joinNode: FlinkLogicalJoin, +expression: (RexNode, List[String], Option[List[RexNode]]) => String) = { + +val joinInfo = joinNode.analyzeCondition() +val keyPairs = joinInfo.pairs.toList +val otherCondition = + if(joinInfo.isEqui) null + else joinInfo.getRemaining(joinNode.getCluster.getRexBuilder) + +val leftKeys = ArrayBuffer.empty[Int] +val rightKeys = ArrayBuffer.empty[Int] +if (!keyPairs.isEmpty) { + val leftFields = joinNode.getLeft.getRowType.getFieldList + val rightFields = joinNode.getRight.getRowType.getFieldList + + keyPairs.foreach(pair => { +val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName +val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName + +// check if keys are compatible +if (leftKeyType == rightKeyType) { + // add key pair + leftKeys.append(pair.source) + rightKeys.append(pair.target) +} else { + throw TableException( +"Equality join predicate on incompatible types.\n" + + s"\tLeft: ${joinNode.getLeft.toString},\n" + + s"\tRight: ${joinNode.getRight.toString},\n" + + s"\tCondition: (${expression(joinNode.getCondition, +joinNode.getRowType.getFieldNames.toList, None)})" + ) +} + }) +} +(leftKeys.toArray, rightKeys.toArray, otherCondition) + } + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005622#comment-16005622 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115850396 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) --- End diff -- This can be done by ``` val joinInfo = JoinInfo.of(leftNode, rightNode, condition) val leftKeys: Array[Int] = joinInfo.leftKeys.toIntArray val rightKeys: Array[Int] = joinInfo.rightKeys.toIntArray val otherCondition = joinInfo.getRemaining(cluster.getRexBuilder) ``` So we do not need a special method for this. The type checks are not required, because Calcite will make sure during validation that only compatible types are compared. So we can be sure that types are valid. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 >
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005635#comment-16005635 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115875501 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005638#comment-16005638 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115852606 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) + +if (left.isInstanceOf[StreamTableSourceScan] --- End diff -- I don't think we need this restriction. A `StreamTableSourceScan` produces a stream not a table. So by this we forbid regular stream-stream joins. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005643#comment-16005643 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115866207 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005639#comment-16005639 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115864005 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FilterRunner.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction} +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.slf4j.LoggerFactory + +class FilterRunner[IN] ( --- End diff -- I don't think we need this additional wrapper. We could just pass the generated code of the `FilterFunction` to the `CoProcessFunction` and compile it in its open() method. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005624#comment-16005624 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115796448 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin + +import org.apache.calcite.rel.core.JoinRelType +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema + +class DataStreamJoinRule + extends ConverterRule( + classOf[FlinkLogicalJoin], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] + +val joinInfo = join.analyzeCondition + +// joins require an equi-condition or a conjunctive predicate with at least one equi-condition +// and disable outer joins with non-equality predicates +!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER) --- End diff -- Is this condition correct? Are outer joins (incl. LEFT, RIGHT, FULL OUTER) supported if the join is an equality join (all conjunctive predicates are equality predicates)? > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005627#comment-16005627 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115874222 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) + +if (left.isInstanceOf[StreamTableSourceScan] +|| right.isInstanceOf[StreamTableSourceScan]) { + throw new TableException( +"Join between stream and table is not supported yet.") +} +// analyze time boundary and time predicate type(proctime/rowtime) +val (timeType, leftStreamWindowSize, rightStreamWindowSize, conditionWithoutTime) = + JoinUtil.analyzeTimeBoundary( +otherCondition, +leftSchema.logicalType.getFieldCount, +leftSchema.physicalType.getFieldCount, +schema.logicalType, +joinNode.getCluster.getRexBuilder, +config) + +val leftDataStream =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005647#comment-16005647 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115873581 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005625#comment-16005625 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115851247 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.runtime.FilterRunner +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze join condition to get equi-conditon and other condition +* @param joinNode logicaljoin node +* @param expression the function to generate condition string +*/ + private[flink] def analyzeJoinCondition( --- End diff -- I don't think we need this method. We can analyze the condition with Calcite's `JoinInfo`. Calcite's validation checks before optimization and translation that the types of the conditions are OK. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005640#comment-16005640 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115868422 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) --- End diff -- use a `JoinFunction` instead of a `FilterFunction`. Compile the code here instead of using a wrapper. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005628#comment-16005628 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115847866 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, --- End diff -- why don't we need the `rightSchema`? > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005626#comment-16005626 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115873251 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 =
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005629#comment-16005629 ] ASF GitHub Bot commented on FLINK-6232: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115844805 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin + +import org.apache.calcite.rel.core.JoinRelType +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema + +class DataStreamJoinRule + extends ConverterRule( + classOf[FlinkLogicalJoin], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] + +val joinInfo = join.analyzeCondition + +// joins require an equi-condition or a conjunctive predicate with at least one equi-condition +// and disable outer joins with non-equality predicates +!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER) --- End diff -- I assume not, because the join type is not passed on to the `DataStreamJoin`. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115855446 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) + +if (left.isInstanceOf[StreamTableSourceScan] +|| right.isInstanceOf[StreamTableSourceScan]) { + throw new TableException( +"Join between stream and table is not supported yet.") +} +// analyze time boundary and time predicate type(proctime/rowtime) +val (timeType, leftStreamWindowSize, rightStreamWindowSize, conditionWithoutTime) = + JoinUtil.analyzeTimeBoundary( +otherCondition, +leftSchema.logicalType.getFieldCount, +leftSchema.physicalType.getFieldCount, +schema.logicalType, +joinNode.getCluster.getRexBuilder, +config) + +val leftDataStream = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) +val rightDataStream = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + +// generate other condition filter function +val filterFunction = +
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115873645 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 = getRuntimeContext.getState(valueStateDescriptor1) + +val valueStateDescriptor2: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) +timerState2 =
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115868097 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.runtime.FilterRunner +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze join condition to get equi-conditon and other condition +* @param joinNode logicaljoin node +* @param expression the function to generate condition string +*/ + private[flink] def analyzeJoinCondition( +joinNode: FlinkLogicalJoin, +expression: (RexNode, List[String], Option[List[RexNode]]) => String) = { + +val joinInfo = joinNode.analyzeCondition() +val keyPairs = joinInfo.pairs.toList +val otherCondition = + if(joinInfo.isEqui) null + else joinInfo.getRemaining(joinNode.getCluster.getRexBuilder) + +val leftKeys = ArrayBuffer.empty[Int] +val rightKeys = ArrayBuffer.empty[Int] +if (!keyPairs.isEmpty) { + val leftFields = joinNode.getLeft.getRowType.getFieldList + val rightFields = joinNode.getRight.getRowType.getFieldList + + keyPairs.foreach(pair => { +val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName +val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName + +// check if keys are compatible +if (leftKeyType == rightKeyType) { + // add key pair + leftKeys.append(pair.source) + rightKeys.append(pair.target) +} else { + throw TableException( +"Equality join predicate on incompatible types.\n" + + s"\tLeft: ${joinNode.getLeft.toString},\n" + + s"\tRight: ${joinNode.getRight.toString},\n" + + s"\tCondition: (${expression(joinNode.getCondition, +joinNode.getRowType.getFieldNames.toList, None)})" + ) +} + }) +} +(leftKeys.toArray, rightKeys.toArray, otherCondition) + } + + /** +* Analyze time-condtion to get time boundary for each stream and get the time type +* and return condition without time-condition. +* +* @param condition other condtion include time-condition +* @param leftFieldCount left stream fields count +* @param inputType left and right connect stream type +* @param rexBuilder util to build rexNode +* @param config table environment config +*/ + private[flink] def analyzeTimeBoundary( --- End diff -- I did not go through the details of this method yet but apparently it analyzes and decomposes the time based condition. It would be very good to have a couple of unit tests which only check this
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115851247 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinUtil.scala --- @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import java.math.{BigDecimal => JBigDecimal} +import java.util +import java.util.EnumSet + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlIntervalQualifier, SqlKind} +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.runtime.FilterRunner +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + + +object JoinUtil { + + /** +* Analyze join condition to get equi-conditon and other condition +* @param joinNode logicaljoin node +* @param expression the function to generate condition string +*/ + private[flink] def analyzeJoinCondition( --- End diff -- I don't think we need this method. We can analyze the condition with Calcite's `JoinInfo`. Calcite's validation checks before optimization and translation that the types of the conditions are OK. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115866207 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 = getRuntimeContext.getState(valueStateDescriptor1) + +val valueStateDescriptor2: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) +timerState2 =
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115873251 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 = getRuntimeContext.getState(valueStateDescriptor1) + +val valueStateDescriptor2: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) +timerState2 =
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115868597 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) --- End diff -- indent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115874115 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FilterRunner.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction} +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.slf4j.LoggerFactory + +class FilterRunner[IN] ( --- End diff -- Actually, we could use a `JoinFunction` instead of a `FilterFunction` as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115796448 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin + +import org.apache.calcite.rel.core.JoinRelType +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema + +class DataStreamJoinRule + extends ConverterRule( + classOf[FlinkLogicalJoin], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] + +val joinInfo = join.analyzeCondition + +// joins require an equi-condition or a conjunctive predicate with at least one equi-condition +// and disable outer joins with non-equality predicates +!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER) --- End diff -- Is this condition correct? Are outer joins (incl. LEFT, RIGHT, FULL OUTER) supported if the join is an equality join (all conjunctive predicates are equality predicates)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115874289 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -698,6 +698,12 @@ class CodeGenerator( s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)", List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;", s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;")) + } else if (clazz == classOf[FilterFunction[_]]) { --- End diff -- Not needed if we use `JoinFunction` instead of `FilterFunction` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115866096 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 = getRuntimeContext.getState(valueStateDescriptor1) + +val valueStateDescriptor2: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) +timerState2 =
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115864221 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) + +if (left.isInstanceOf[StreamTableSourceScan] +|| right.isInstanceOf[StreamTableSourceScan]) { + throw new TableException( +"Join between stream and table is not supported yet.") +} +// analyze time boundary and time predicate type(proctime/rowtime) +val (timeType, leftStreamWindowSize, rightStreamWindowSize, conditionWithoutTime) = + JoinUtil.analyzeTimeBoundary( +otherCondition, +leftSchema.logicalType.getFieldCount, +leftSchema.physicalType.getFieldCount, +schema.logicalType, +joinNode.getCluster.getRexBuilder, +config) + +val leftDataStream = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) +val rightDataStream = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + +// generate other condition filter function +val filterFunction = ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115864005 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FilterRunner.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction} +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.slf4j.LoggerFactory + +class FilterRunner[IN] ( --- End diff -- I don't think we need this additional wrapper. We could just pass the generated code of the `FilterFunction` to the `CoProcessFunction` and compile it in its open() method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115852606 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) + +if (left.isInstanceOf[StreamTableSourceScan] --- End diff -- I don't think we need this restriction. A `StreamTableSourceScan` produces a stream not a table. So by this we forbid regular stream-stream joins. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115873581 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 = getRuntimeContext.getState(valueStateDescriptor1) + +val valueStateDescriptor2: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) +timerState2 =
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115845838 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin + +import org.apache.calcite.rel.core.JoinRelType +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema + +class DataStreamJoinRule + extends ConverterRule( + classOf[FlinkLogicalJoin], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] + +val joinInfo = join.analyzeCondition + +// joins require an equi-condition or a conjunctive predicate with at least one equi-condition +// and disable outer joins with non-equality predicates +!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER) + } + + override def convert(rel: RelNode): RelNode = { + +val join: FlinkLogicalJoin = rel.asInstanceOf[FlinkLogicalJoin] +val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) +val convLeft: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.DATASTREAM) +val convRight: RelNode = RelOptRule.convert(join.getInput(1), FlinkConventions.DATASTREAM) + +new DataStreamJoin( + rel.getCluster, + traitSet, + convLeft, + convRight, + join, --- End diff -- please pass join condition and join type instead of the `LogicalFlinkJoin`. This is a plan node of the logical plan that should not be part of a node in the physical plan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115871238 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 = getRuntimeContext.getState(valueStateDescriptor1) + +val valueStateDescriptor2: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) +timerState2 =
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115872865 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ --- End diff -- what are the timer states used for? I only see one `value()` call to get the value which checks `== 0` if a timer is registered. Can we make this a boolean state then? What would happen if we would not have this state? Would there be more timers (timers are unique by time). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115844805 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin + +import org.apache.calcite.rel.core.JoinRelType +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema + +class DataStreamJoinRule + extends ConverterRule( + classOf[FlinkLogicalJoin], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] + +val joinInfo = join.analyzeCondition + +// joins require an equi-condition or a conjunctive predicate with at least one equi-condition +// and disable outer joins with non-equality predicates +!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER) --- End diff -- I assume not, because the join type is not passed on to the `DataStreamJoin`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115852788 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) + +if (left.isInstanceOf[StreamTableSourceScan] +|| right.isInstanceOf[StreamTableSourceScan]) { + throw new TableException( +"Join between stream and table is not supported yet.") +} +// analyze time boundary and time predicate type(proctime/rowtime) +val (timeType, leftStreamWindowSize, rightStreamWindowSize, conditionWithoutTime) = + JoinUtil.analyzeTimeBoundary( +otherCondition, +leftSchema.logicalType.getFieldCount, +leftSchema.physicalType.getFieldCount, +schema.logicalType, +joinNode.getCluster.getRexBuilder, +config) + +val leftDataStream = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) +val rightDataStream = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + +// generate other condition filter function +val filterFunction = +
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115864982 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], --- End diff -- I think we can use `TypeInformation[Row]` here and don't need the `CRow` wrapper. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115865585 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 = getRuntimeContext.getState(valueStateDescriptor1) + +val valueStateDescriptor2: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) +timerState2 =
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115847719 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, --- End diff -- please do not include a `FlinkLogicalJoin` node but the condition and the join type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115874222 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) + +if (left.isInstanceOf[StreamTableSourceScan] +|| right.isInstanceOf[StreamTableSourceScan]) { + throw new TableException( +"Join between stream and table is not supported yet.") +} +// analyze time boundary and time predicate type(proctime/rowtime) +val (timeType, leftStreamWindowSize, rightStreamWindowSize, conditionWithoutTime) = + JoinUtil.analyzeTimeBoundary( +otherCondition, +leftSchema.logicalType.getFieldCount, +leftSchema.physicalType.getFieldCount, +schema.logicalType, +joinNode.getCluster.getRexBuilder, +config) + +val leftDataStream = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) +val rightDataStream = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + +// generate other condition filter function +val filterFunction = +
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115847866 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, --- End diff -- why don't we need the `rightSchema`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115850396 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.{JoinUtil, ProcTimeInnerJoin} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinNode: FlinkLogicalJoin, + leftSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType() = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinNode, + leftSchema, + schema, + ruleDescription) + } + + override def toString: String = { + +s"${joinTypeToString(joinNode.getJoinType)}" + + s"(condition: (${joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)}), " + + s"select: (${joinSelectionToString(schema.logicalType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("condition", joinConditionToString(schema.logicalType, +joinNode.getCondition, getExpressionString)) + .item("select", joinSelectionToString(schema.logicalType)) + .item("joinType", joinTypeToString(joinNode.getJoinType)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + +val config = tableEnv.getConfig + +// get the equality keys and other condition +val (leftKeys, rightKeys, otherCondition) = + JoinUtil.analyzeJoinCondition(joinNode, getExpressionString) --- End diff -- This can be done by ``` val joinInfo = JoinInfo.of(leftNode, rightNode, condition) val leftKeys: Array[Int] = joinInfo.leftKeys.toIntArray val rightKeys: Array[Int] = joinInfo.rightKeys.toIntArray val otherCondition = joinInfo.getRemaining(cluster.getRexBuilder) ``` So we do not need a special method for this. The type checks are not required, because Calcite will make sure during validation that only compatible types are compared. So we can be sure that types are valid. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3715: [FLINK-6232][Table]Support proctime inner equi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r115875501 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeInnerJoin.scala --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftStreamWindowSizethe left stream window size + * @param rightStreamWindowSizethe right stream window size + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param filterFuncthe function of other non-equi condition include time condition + * + */ +class ProcTimeInnerJoin( + private val leftStreamWindowSize: Long, + private val rightStreamWindowSize: Long, + private val element1Type: TypeInformation[CRow], + private val element2Type: TypeInformation[CRow], + private val filterFunc: RichFilterFunction[Row]) + extends CoProcessFunction[CRow, CRow, CRow] { + + private var outputC: CRow = _ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + + override def open(config: Configuration) { +outputC = new CRow(new Row(element1Type.getArity + element2Type.getArity), true) +filterFunc.setRuntimeContext(getRuntimeContext) +filterFunc.open(config) + +listToRemove = new util.ArrayList[Long]() + +// initialize row state +val rowListTypeInfo1: TypeInformation[JList[Row]] = +new ListTypeInfo[Row](element1Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) +row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + +val rowListTypeInfo2: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](element2Type.asInstanceOf[CRowTypeInfo].rowType) +val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) +row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + +// initialize timer state +val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) +timerState1 = getRuntimeContext.getState(valueStateDescriptor1) + +val valueStateDescriptor2: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) +timerState2 =
[jira] [Comment Edited] (FLINK-5536) Config option: HA
[ https://issues.apache.org/jira/browse/FLINK-5536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005585#comment-16005585 ] Stavros Kontopoulos edited comment on FLINK-5536 at 5/10/17 10:43 PM: -- [~eronwright] https://github.com/mesosphere/dcos-flink-service/pull/23 https://github.com/mesosphere/universe/pull/1163 was (Author: skonto): https://github.com/mesosphere/dcos-flink-service/pull/23 https://github.com/mesosphere/universe/pull/1163 > Config option: HA > - > > Key: FLINK-5536 > URL: https://issues.apache.org/jira/browse/FLINK-5536 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Eron Wright >Assignee: Stavros Kontopoulos > > Configure Flink HA thru package options plus good defaults. The main > components are ZK configuration and state backend configuration. > - The ZK information can be defaulted to `master.mesos` as with other packages > - Evaluate whether ZK can be fully configured by default, even if a state > backend isn't configured. > - Use DCOS HDFS as the filesystem for the state backend. Evaluate whether to > assume that DCOS HDFS is installed by default, or whether to make it explicit. > - To use DCOS HDFS, the init script should download the core-site.xml and > hdfs-site.xml from the HDFS 'connection' endpoint. Supply a default value > for the endpoint address; see > [https://docs.mesosphere.com/service-docs/hdfs/connecting-clients/]. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5536) Config option: HA
[ https://issues.apache.org/jira/browse/FLINK-5536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005585#comment-16005585 ] Stavros Kontopoulos commented on FLINK-5536: https://github.com/mesosphere/dcos-flink-service/pull/23 https://github.com/mesosphere/universe/pull/1163 > Config option: HA > - > > Key: FLINK-5536 > URL: https://issues.apache.org/jira/browse/FLINK-5536 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Eron Wright >Assignee: Stavros Kontopoulos > > Configure Flink HA thru package options plus good defaults. The main > components are ZK configuration and state backend configuration. > - The ZK information can be defaulted to `master.mesos` as with other packages > - Evaluate whether ZK can be fully configured by default, even if a state > backend isn't configured. > - Use DCOS HDFS as the filesystem for the state backend. Evaluate whether to > assume that DCOS HDFS is installed by default, or whether to make it explicit. > - To use DCOS HDFS, the init script should download the core-site.xml and > hdfs-site.xml from the HDFS 'connection' endpoint. Supply a default value > for the endpoint address; see > [https://docs.mesosphere.com/service-docs/hdfs/connecting-clients/]. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005509#comment-16005509 ] ASF GitHub Bot commented on FLINK-6225: --- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r115861002 --- Diff: flink-connectors/flink-connector-cassandra/pom.xml --- @@ -176,5 +176,23 @@ under the License. + + org.apache.flink + flink-table_2.10 + ${project.version} + provided + + + org.apache.flink + flink-scala_2.10 --- End diff -- This is interesting as now `flink-table_2.10` no longer pulls in `flink-streaming-scala_2.10` as a transitive dependency. Therefore the change needs to introduce the dependency in order to compile anything against the `flink-streaming-scala` module (e.g., `StreamExecutionEnviornment`). > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r115861002 --- Diff: flink-connectors/flink-connector-cassandra/pom.xml --- @@ -176,5 +176,23 @@ under the License. + + org.apache.flink + flink-table_2.10 + ${project.version} + provided + + + org.apache.flink + flink-scala_2.10 --- End diff -- This is interesting as now `flink-table_2.10` no longer pulls in `flink-streaming-scala_2.10` as a transitive dependency. Therefore the change needs to introduce the dependency in order to compile anything against the `flink-streaming-scala` module (e.g., `StreamExecutionEnviornment`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005494#comment-16005494 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r115859400 --- Diff: flink-connectors/flink-connector-cassandra/pom.xml --- @@ -176,5 +176,23 @@ under the License. + + org.apache.flink + flink-table_2.10 + ${project.version} + provided + + + org.apache.flink + flink-scala_2.10 --- End diff -- Do we actually depend on the scala dependencies? > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r115859400 --- Diff: flink-connectors/flink-connector-cassandra/pom.xml --- @@ -176,5 +176,23 @@ under the License. + + org.apache.flink + flink-table_2.10 + ${project.version} + provided + + + org.apache.flink + flink-scala_2.10 --- End diff -- Do we actually depend on the scala dependencies? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6531) Deserialize checkpoint hooks with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005401#comment-16005401 ] ASF GitHub Bot commented on FLINK-6531: --- Github user EronWright closed the pull request at: https://github.com/apache/flink/pull/3867 > Deserialize checkpoint hooks with user classloader > -- > > Key: FLINK-6531 > URL: https://issues.apache.org/jira/browse/FLINK-6531 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > The checkpoint hooks introduced in FLINK-6390 aren't being deserialized with > the user classloader, breaking remote execution. > Remote execution produces a `ClassNotFoundException` as the job graph is > transferred from the client to the JobManager. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3867: [FLINK-6531] Deserialize checkpoint hooks with use...
Github user EronWright closed the pull request at: https://github.com/apache/flink/pull/3867 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3868: [FLINK-6532] [checkpoints] Ensure proper classloading for...
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3868 The title should refer to FLINK-6531 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6531) Deserialize checkpoint hooks with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005364#comment-16005364 ] ASF GitHub Bot commented on FLINK-6531: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3867 @EronWright Here is a slightly modified version of your code: https://github.com/apache/flink/pull/3868 I added a small test to also validate the class loading. > Deserialize checkpoint hooks with user classloader > -- > > Key: FLINK-6531 > URL: https://issues.apache.org/jira/browse/FLINK-6531 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > The checkpoint hooks introduced in FLINK-6390 aren't being deserialized with > the user classloader, breaking remote execution. > Remote execution produces a `ClassNotFoundException` as the job graph is > transferred from the client to the JobManager. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3867: [FLINK-6531] Deserialize checkpoint hooks with user class...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3867 @EronWright Here is a slightly modified version of your code: https://github.com/apache/flink/pull/3868 I added a small test to also validate the class loading. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3868: [FLINK-6532] [checkpoints] Ensure proper classload...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3868 [FLINK-6532] [checkpoints] Ensure proper classloading for user-defined checkpoint hooks This also adds a test that validates that the correct class loader is passed. Note the neat trick via `CommonTestUtils.createObjectForClassNotInClassPath(classLoader)` to conjure up an object that is not in the test's class path. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink hook_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3868.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3868 commit 8f6e7ab9f1fa96c24dff44f75551fec968d657c6 Author: Stephan EwenDate: 2017-05-10T20:03:49Z [FLINK-6532] [checkpoints] Ensure proper classloading for user-defined checkpoint hooks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6529) Rework the shading model in Flink
[ https://issues.apache.org/jira/browse/FLINK-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005350#comment-16005350 ] Haohui Mai commented on FLINK-6529: --- That makes sense as we are being hit by this problem quite hard (especially guava). I can take this up. > Rework the shading model in Flink > - > > Key: FLINK-6529 > URL: https://issues.apache.org/jira/browse/FLINK-6529 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.0, 1.2.1 >Reporter: Stephan Ewen >Assignee: Haohui Mai >Priority: Critical > > h2. Problem > Currently, Flink shades dependencies like ASM and Guava into all jars of > projects that reference it and relocate the classes. > There are some drawbacks to that approach, let's discuss them at the example > of ASM: > - The ASM classes are for example in {{flink-core}}, {{flink-java}}, > {{flink-scala}}, {{flink-runtime}}, etc. > - Users that reference these dependencies have the classes multiple times > in the classpath. That is unclean (works, through, because the classes are > identical). The same happens when building the final dist. jar. > - Some of these dependencies require to include license files in the shaded > jar. It is hard to impossible to build a good automatic solution for that, > partly due to Maven's very poor cross-project path support > - Scala does not support shading really well. Scala classes have references > to classes in more places than just the class names (apparently for Scala > reflect support). Referencing a Scala project with shaded ASM still requires > to add a reference to unshaded ASM (at least as a compile dependency). > h2. Proposal > I propose that we build and deploy a {{asm-flink-shaded}} version of ASM and > directly program against the relocated namespaces. Since we never use classes > that we relocate in public interfaces, Flink users will never see the > relocated class names. Internally, it does not hurt to use them. > - Proper maven dependency management, no hidden (shaded) dependencies > - one copy of each dependency > - proper Scala interoperability > - no clumsy license management (license is in the deployed > {{asm-flink-shaded}}) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6529) Rework the shading model in Flink
[ https://issues.apache.org/jira/browse/FLINK-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haohui Mai reassigned FLINK-6529: - Assignee: Haohui Mai > Rework the shading model in Flink > - > > Key: FLINK-6529 > URL: https://issues.apache.org/jira/browse/FLINK-6529 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.0, 1.2.1 >Reporter: Stephan Ewen >Assignee: Haohui Mai >Priority: Critical > > h2. Problem > Currently, Flink shades dependencies like ASM and Guava into all jars of > projects that reference it and relocate the classes. > There are some drawbacks to that approach, let's discuss them at the example > of ASM: > - The ASM classes are for example in {{flink-core}}, {{flink-java}}, > {{flink-scala}}, {{flink-runtime}}, etc. > - Users that reference these dependencies have the classes multiple times > in the classpath. That is unclean (works, through, because the classes are > identical). The same happens when building the final dist. jar. > - Some of these dependencies require to include license files in the shaded > jar. It is hard to impossible to build a good automatic solution for that, > partly due to Maven's very poor cross-project path support > - Scala does not support shading really well. Scala classes have references > to classes in more places than just the class names (apparently for Scala > reflect support). Referencing a Scala project with shaded ASM still requires > to add a reference to unshaded ASM (at least as a compile dependency). > h2. Proposal > I propose that we build and deploy a {{asm-flink-shaded}} version of ASM and > directly program against the relocated namespaces. Since we never use classes > that we relocate in public interfaces, Flink users will never see the > relocated class names. Internally, it does not hurt to use them. > - Proper maven dependency management, no hidden (shaded) dependencies > - one copy of each dependency > - proper Scala interoperability > - no clumsy license management (license is in the deployed > {{asm-flink-shaded}}) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin
[ https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005325#comment-16005325 ] ASF GitHub Bot commented on FLINK-4499: --- Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2422 Hadoop has switched to spotbugs > Introduce findbugs maven plugin > --- > > Key: FLINK-4499 > URL: https://issues.apache.org/jira/browse/FLINK-4499 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Assignee: Suneel Marthi > > As suggested by Stephan in FLINK-4482, this issue is to add > findbugs-maven-plugin into the build process so that we can detect lack of > proper locking and other defects automatically. > We can begin with small set of rules. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2422: FLINK-4499: [WIP] Introduce findbugs maven plugin
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2422 Hadoop has switched to spotbugs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin
[ https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005283#comment-16005283 ] ASF GitHub Bot commented on FLINK-4499: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2422 We would need INFRA to enable a daily cron build for Flink. How will we use FindBugs (which may be dead)? Is a goal to export the site artifacts to S3? > Introduce findbugs maven plugin > --- > > Key: FLINK-4499 > URL: https://issues.apache.org/jira/browse/FLINK-4499 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Assignee: Suneel Marthi > > As suggested by Stephan in FLINK-4482, this issue is to add > findbugs-maven-plugin into the build process so that we can detect lack of > proper locking and other defects automatically. > We can begin with small set of rules. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2422: FLINK-4499: [WIP] Introduce findbugs maven plugin
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2422 We would need INFRA to enable a daily cron build for Flink. How will we use FindBugs (which may be dead)? Is a goal to export the site artifacts to S3? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6531) Deserialize checkpoint hooks with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005281#comment-16005281 ] ASF GitHub Bot commented on FLINK-6531: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3867 Thanks for this patch, this is crucial! There is a slightly different approach I would suggest, to not assume that the `Factory` is always in the core classpath. I have the code for that almost ready, plus a test. Can open a PR for your review shortly... > Deserialize checkpoint hooks with user classloader > -- > > Key: FLINK-6531 > URL: https://issues.apache.org/jira/browse/FLINK-6531 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > The checkpoint hooks introduced in FLINK-6390 aren't being deserialized with > the user classloader, breaking remote execution. > Remote execution produces a `ClassNotFoundException` as the job graph is > transferred from the client to the JobManager. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3867: [FLINK-6531] Deserialize checkpoint hooks with user class...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3867 Thanks for this patch, this is crucial! There is a slightly different approach I would suggest, to not assume that the `Factory` is always in the core classpath. I have the code for that almost ready, plus a test. Can open a PR for your review shortly... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005279#comment-16005279 ] ASF GitHub Bot commented on FLINK-6075: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3714 @fhueske As at some point there were some other issues open on this i just wanted to ping you on the right :) > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2 consecutive hour intervals. Proctime > indicates the processing time when a new event arrives in the system. Events > are of the type (a,b) with the ordering being applied on the b field. > `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) > ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `) > ||Rowtime|| Proctime|| Stream1|| Limit 2|| Top 2|| Sort > [ASC]|| > | |10:00:00 |(aaa, 11) | | | >| > | |10:05:00|(aab, 7) | | || > |10-11 |11:00:00 | | aab,aaa |aab,aaa | aab,aaa >| > | |11:03:00 |(aac,21) | | || > > |11-12|12:00:00 | | aab,aaa |aab,aaa | aab,aaa,aac| > | |12:10:00 |(abb,12) | | || > > | |12:15:00 |(abb,12) | | || > > |12-13 |13:00:00 | | abb,abb | abb,abb | > abb,abb,aac| > |...| > **Implementation option** > Considering that the SQL operators will be associated with window boundaries, > the functionality will be implemented within the logic of the window as > follows. > * Window assigner – selected based on the type of window used in SQL > (TUMBLING, SLIDING…) > * Evictor/ Trigger – time or count evictor based on the definition of the > window boundaries > * Apply – window function that sorts data and selects the output to trigger > (based on LIMIT/TOP parameters). All data will be sorted at once and result > outputted when the window is triggered > An alternative implementation can be to use a fold window function to sort > the elements as they arrive, one at a time followed by a flatMap
[GitHub] flink issue #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream SQL
Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3714 @fhueske As at some point there were some other issues open on this i just wanted to ping you on the right :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---