[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142851216 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricId.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; --- End diff -- moved to `org.apache.flink.runtime.rest.messages.metrics` ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142851091 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +/** + * + */ +public interface JobMetricsOverview extends ResponseBody { --- End diff -- renamed ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142850939 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDQueryParameter.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +/** + * + */ +public class JobIDQueryParameter extends MessageQueryParameter { --- End diff -- renamed ---
[GitHub] flink pull request #4757: [Flink 7694][REST][Webfrontend]Port JobMetricsHand...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r142850915 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobMetricsOverview.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; --- End diff -- moved to `org.apache.flink.runtime.rest.messages.metrics` ---
[jira] [Updated] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails
[ https://issues.apache.org/jira/browse/FLINK-7488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7488: -- Description: {code} compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.239 sec <<< FAILURE! org.junit.ComparisonFailure: Different network buffer memory sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81) compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.16 sec <<< FAILURE! org.junit.ComparisonFailure: Different heap sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:275) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:110) {code} $HADOOP_CONF_DIR was not set prior to running the test. was: {code} compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.239 sec <<< FAILURE! org.junit.ComparisonFailure: Different network buffer memory sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81) compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.16 sec <<< FAILURE! org.junit.ComparisonFailure:
[jira] [Updated] (FLINK-7495) AbstractUdfStreamOperator#initializeState() should be called in AsyncWaitOperator#initializeState()
[ https://issues.apache.org/jira/browse/FLINK-7495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7495: -- Description: {code} recoveredStreamElements = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); {code} Call to AbstractUdfStreamOperator#initializeState() should be added in the beginning was: {code} recoveredStreamElements = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); {code} Call to AbstractUdfStreamOperator#initializeState() should be added in the beginning > AbstractUdfStreamOperator#initializeState() should be called in > AsyncWaitOperator#initializeState() > --- > > Key: FLINK-7495 > URL: https://issues.apache.org/jira/browse/FLINK-7495 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Fang Yong >Priority: Minor > > {code} > recoveredStreamElements = context > .getOperatorStateStore() > .getListState(new ListStateDescriptor<>(STATE_NAME, > inStreamElementSerializer)); > {code} > Call to AbstractUdfStreamOperator#initializeState() should be added in the > beginning -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7588) Document RocksDB tuning for spinning disks
[ https://issues.apache.org/jira/browse/FLINK-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7588: -- Component/s: Documentation > Document RocksDB tuning for spinning disks > -- > > Key: FLINK-7588 > URL: https://issues.apache.org/jira/browse/FLINK-7588 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Ted Yu > > In docs/ops/state/large_state_tuning.md , it was mentioned that: > bq. the default configuration is tailored towards SSDs and performs > suboptimal on spinning disks > We should add recommendation targeting spinning disks: > https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7679) Upgrade maven enforcer plugin to 3.0.0-M1
[ https://issues.apache.org/jira/browse/FLINK-7679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7679: -- Description: I got the following build error against Java 9: {code} [ERROR] Failed to execute goal org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce (enforce-maven) on project flink-parent: Execution enforce-maven of goal org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce failed: An API incompatibility was encountered while executing org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce: java.lang.ExceptionInInitializerError: null [ERROR] - [ERROR] realm =plugin>org.apache.maven.plugins:maven-enforcer-plugin:1.4.1 [ERROR] strategy = org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy [ERROR] urls[0] = file:/home/hbase/.m2/repository/org/apache/maven/plugins/maven-enforcer-plugin/1.4.1/maven-enforcer-plugin-1.4.1.jar {code} Upgrading maven enforcer plugin to 3.0.0-M1 would get over the above error. was: I got the following build error against Java 9: {code} [ERROR] Failed to execute goal org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce (enforce-maven) on project flink-parent: Execution enforce-maven of goal org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce failed: An API incompatibility was encountered while executing org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce: java.lang.ExceptionInInitializerError: null [ERROR] - [ERROR] realm =plugin>org.apache.maven.plugins:maven-enforcer-plugin:1.4.1 [ERROR] strategy = org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy [ERROR] urls[0] = file:/home/hbase/.m2/repository/org/apache/maven/plugins/maven-enforcer-plugin/1.4.1/maven-enforcer-plugin-1.4.1.jar {code} Upgrading maven enforcer plugin to 3.0.0-M1 would get over the above error. > Upgrade maven enforcer plugin to 3.0.0-M1 > - > > Key: FLINK-7679 > URL: https://issues.apache.org/jira/browse/FLINK-7679 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > > I got the following build error against Java 9: > {code} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce (enforce-maven) > on project flink-parent: Execution enforce-maven of goal > org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce failed: An API > incompatibility was encountered while executing > org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce: > java.lang.ExceptionInInitializerError: null > [ERROR] - > [ERROR] realm =plugin>org.apache.maven.plugins:maven-enforcer-plugin:1.4.1 > [ERROR] strategy = org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy > [ERROR] urls[0] = > file:/home/hbase/.m2/repository/org/apache/maven/plugins/maven-enforcer-plugin/1.4.1/maven-enforcer-plugin-1.4.1.jar > {code} > Upgrading maven enforcer plugin to 3.0.0-M1 would get over the above error. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7642) Upgrade maven surefire plugin to 2.19.1
[ https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7642: -- Description: Surefire 2.19 release introduced more useful test filters which would let us run a subset of the test. This issue is for upgrading maven surefire plugin to 2.19.1 was: Surefire 2.19 release introduced more useful test filters which would let us run a subset of the test. This issue is for upgrading maven surefire plugin to 2.19.1 > Upgrade maven surefire plugin to 2.19.1 > --- > > Key: FLINK-7642 > URL: https://issues.apache.org/jira/browse/FLINK-7642 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > > Surefire 2.19 release introduced more useful test filters which would let us > run a subset of the test. > This issue is for upgrading maven surefire plugin to 2.19.1 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7743) Remove the restriction of minimum memory of JM
[ https://issues.apache.org/jira/browse/FLINK-7743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192389#comment-16192389 ] mingleizhang commented on FLINK-7743: - +1 > Remove the restriction of minimum memory of JM > -- > > Key: FLINK-7743 > URL: https://issues.apache.org/jira/browse/FLINK-7743 > Project: Flink > Issue Type: Bug >Reporter: Haohui Mai >Assignee: Haohui Mai > > Per discussion on > http://mail-archives.apache.org/mod_mbox/flink-user/201709.mbox/%3c4f77255e-1ddb-4e99-a667-73941b110...@apache.org%3E > It might be great to remove the restriction of the minimum heap size of the > JM. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7261) avoid unnecessary exceptions in the logs in non-HA cases
[ https://issues.apache.org/jira/browse/FLINK-7261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192158#comment-16192158 ] ASF GitHub Bot commented on FLINK-7261: --- Github user NicoK closed the pull request at: https://github.com/apache/flink/pull/4402 > avoid unnecessary exceptions in the logs in non-HA cases > > > Key: FLINK-7261 > URL: https://issues.apache.org/jira/browse/FLINK-7261 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > {{PermanentBlobCache#getHAFileInternal}} first tries to download files from > the HA store but if it does not exist, it will log an exception from the > attempt to move the {{incomingFile}} to its destination which is misleading > to the user. > We should extend {{BlobView#get}} to return whether a file was actually > copied or not, e.g. in the {{VoidBlobStore}} to keep the abstraction of the > BLOB stores but to not report errors in expected cases (recall that > {{FileSystemBlobStore#get}} will already throw an exception if anything > failed in there and if successful but the succeeding move fails the exception > from the move should still be prevailed). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7261) avoid unnecessary exceptions in the logs in non-HA cases
[ https://issues.apache.org/jira/browse/FLINK-7261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192157#comment-16192157 ] ASF GitHub Bot commented on FLINK-7261: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4402 will be fixed by #4358 > avoid unnecessary exceptions in the logs in non-HA cases > > > Key: FLINK-7261 > URL: https://issues.apache.org/jira/browse/FLINK-7261 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > {{PermanentBlobCache#getHAFileInternal}} first tries to download files from > the HA store but if it does not exist, it will log an exception from the > attempt to move the {{incomingFile}} to its destination which is misleading > to the user. > We should extend {{BlobView#get}} to return whether a file was actually > copied or not, e.g. in the {{VoidBlobStore}} to keep the abstraction of the > BLOB stores but to not report errors in expected cases (recall that > {{FileSystemBlobStore#get}} will already throw an exception if anything > failed in there and if successful but the succeeding move fails the exception > from the move should still be prevailed). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4402: [FLINK-7261][blob] extend BlobStore#get/put with boolean ...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4402 will be fixed by #4358 ---
[GitHub] flink pull request #4402: [FLINK-7261][blob] extend BlobStore#get/put with b...
Github user NicoK closed the pull request at: https://github.com/apache/flink/pull/4402 ---
[jira] [Commented] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration
[ https://issues.apache.org/jira/browse/FLINK-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192156#comment-16192156 ] ASF GitHub Bot commented on FLINK-7483: --- Github user NicoK closed the pull request at: https://github.com/apache/flink/pull/4568 > BlobCache cleanup timer not reset after job re-registration > --- > > Key: FLINK-7483 > URL: https://issues.apache.org/jira/browse/FLINK-7483 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and > {{releaseJob}} calls where the latter sets a cleanup interval. > {{registerJob}}, however, forgets to reset this if the job is re-registered > again and so the job's blobs will be cleaned up although it is still used! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration
[ https://issues.apache.org/jira/browse/FLINK-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192155#comment-16192155 ] ASF GitHub Bot commented on FLINK-7483: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4568 will be fixed by #4358 > BlobCache cleanup timer not reset after job re-registration > --- > > Key: FLINK-7483 > URL: https://issues.apache.org/jira/browse/FLINK-7483 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and > {{releaseJob}} calls where the latter sets a cleanup interval. > {{registerJob}}, however, forgets to reset this if the job is re-registered > again and so the job's blobs will be cleaned up although it is still used! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4568: [FLINK-7483][blob] prevent cleanup of re-registere...
Github user NicoK closed the pull request at: https://github.com/apache/flink/pull/4568 ---
[GitHub] flink issue #4568: [FLINK-7483][blob] prevent cleanup of re-registered jobs
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4568 will be fixed by #4358 ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192153#comment-16192153 ] ASF GitHub Bot commented on FLINK-7068: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4358 Rebased and extended the PR as requested - the last two commits contain the changes compared to the last review. I tried to clean up some of the commits for a better merge but please note that this PR also includes #4568, #4238 and #4402 fixes and commits. For some of those, changes were applied afterwards in the review process which will cause conflicts in their respective PRs that I will close (fixed after merging this PR). > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4358 Rebased and extended the PR as requested - the last two commits contain the changes compared to the last review. I tried to clean up some of the commits for a better merge but please note that this PR also includes #4568, #4238 and #4402 fixes and commits. For some of those, changes were applied afterwards in the review process which will cause conflicts in their respective PRs that I will close (fixed after merging this PR). ---
[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192108#comment-16192108 ] ASF GitHub Bot commented on FLINK-7657: --- Github user kmurra commented on the issue: https://github.com/apache/flink/pull/4746 Regarding the time zones, I think I understand your argument here. Is there anything in particular that you would want me to change overall that you haven't already outlined to account for that? I do want to document why we're doing any conversions of time zones since it took me some amount of time to understand why it was being done (it looked incorrect to myself and several other developers on first glance). Also, I noticed that the Calcite fromCalendarFields simply take the fields directly from the Calendar, so making time-zone adjustments are unnecessary after I made the changes to toRexNode. I'll fix that as well for my next commit. > SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException > -- > > Key: FLINK-7657 > URL: https://issues.apache.org/jira/browse/FLINK-7657 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.3.2 >Reporter: Kent Murra >Assignee: Kent Murra >Priority: Critical > > I have a SQL statement using the Tables API that has a timestamp in it. When > the execution environment tries to optimize the SQL, it causes an exception > (attached below). The result is any SQL query with a timestamp, date, or > time literal is unexecutable if any table source is marked with > FilterableTableSource. > {code:none} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule PushFilterIntoTableSourceScanRule, args > [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, > $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], > fields:(data, last_updated))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135) > at org.apache.flink.table.api.Table.writeToSink(table.scala:800) > at org.apache.flink.table.api.Table.writeToSink(table.scala:773) > at > com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27) > at > com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) > at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) > Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot > be cast to java.util.Date > at > org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107) > at > org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35) > at >
[GitHub] flink issue #4746: [FLINK-7657] [Table] Adding logic to convert RexLiteral t...
Github user kmurra commented on the issue: https://github.com/apache/flink/pull/4746 Regarding the time zones, I think I understand your argument here. Is there anything in particular that you would want me to change overall that you haven't already outlined to account for that? I do want to document why we're doing any conversions of time zones since it took me some amount of time to understand why it was being done (it looked incorrect to myself and several other developers on first glance). Also, I noticed that the Calcite fromCalendarFields simply take the fields directly from the Calendar, so making time-zone adjustments are unnecessary after I made the changes to toRexNode. I'll fix that as well for my next commit. ---
[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192091#comment-16192091 ] ASF GitHub Bot commented on FLINK-7657: --- Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142801455 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala --- @@ -49,10 +50,51 @@ object Literal { case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME) case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP) } + + private[flink] def apply(rexNode: RexLiteral): Literal = { +val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType) + +val literalValue = literalType match { + // Chrono use cases. We're force-adjusting the UTC-based epoch timestamps to a new + // timestamp such that we get the same year/month/hour/day field values in the query's + // timezone (UTC) + case _...@sqltimetypeinfo.date => +val rexValue = rexNode.getValueAs(classOf[DateString]) +val adjustedCal = adjustCalendar(rexValue.toCalendar, TimeZone.getDefault) +new Date(adjustedCal.getTimeInMillis) --- End diff -- Do you want me to use the deprecated constructor or leave this as-is? > SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException > -- > > Key: FLINK-7657 > URL: https://issues.apache.org/jira/browse/FLINK-7657 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.3.2 >Reporter: Kent Murra >Assignee: Kent Murra >Priority: Critical > > I have a SQL statement using the Tables API that has a timestamp in it. When > the execution environment tries to optimize the SQL, it causes an exception > (attached below). The result is any SQL query with a timestamp, date, or > time literal is unexecutable if any table source is marked with > FilterableTableSource. > {code:none} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule PushFilterIntoTableSourceScanRule, args > [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, > $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], > fields:(data, last_updated))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135) > at org.apache.flink.table.api.Table.writeToSink(table.scala:800) > at org.apache.flink.table.api.Table.writeToSink(table.scala:773) > at > com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27) > at > com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) > at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) > Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot > be cast to java.util.Date > at > org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107) > at > org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at >
[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192090#comment-16192090 ] ASF GitHub Bot commented on FLINK-7657: --- Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142801313 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala --- @@ -49,10 +50,51 @@ object Literal { case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME) case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP) } + + private[flink] def apply(rexNode: RexLiteral): Literal = { --- End diff -- I'll commit to using your standards for the code-base. However, allow me to voice a disagreement here: The Literal class does the conversion from the Literal back to the RexLiteral. Having this logic specifically in RexNodeToExpressionConverted means the RexLiteral-to-Literal logic is physically split from the Literal-to-RexLiteral logic. This makes it slightly easier for a contributor to make a change in one side of the conversion without accounting for the other. In particular, the date adjustments here become harder to understand since the context is split between two different files. > SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException > -- > > Key: FLINK-7657 > URL: https://issues.apache.org/jira/browse/FLINK-7657 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.3.2 >Reporter: Kent Murra >Assignee: Kent Murra >Priority: Critical > > I have a SQL statement using the Tables API that has a timestamp in it. When > the execution environment tries to optimize the SQL, it causes an exception > (attached below). The result is any SQL query with a timestamp, date, or > time literal is unexecutable if any table source is marked with > FilterableTableSource. > {code:none} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule PushFilterIntoTableSourceScanRule, args > [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, > $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], > fields:(data, last_updated))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135) > at org.apache.flink.table.api.Table.writeToSink(table.scala:800) > at org.apache.flink.table.api.Table.writeToSink(table.scala:773) > at > com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27) > at > com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) > at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) > Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot > be cast to java.util.Date > at > org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107) > at > org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at >
[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...
Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142801455 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala --- @@ -49,10 +50,51 @@ object Literal { case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME) case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP) } + + private[flink] def apply(rexNode: RexLiteral): Literal = { +val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType) + +val literalValue = literalType match { + // Chrono use cases. We're force-adjusting the UTC-based epoch timestamps to a new + // timestamp such that we get the same year/month/hour/day field values in the query's + // timezone (UTC) + case _...@sqltimetypeinfo.date => +val rexValue = rexNode.getValueAs(classOf[DateString]) +val adjustedCal = adjustCalendar(rexValue.toCalendar, TimeZone.getDefault) +new Date(adjustedCal.getTimeInMillis) --- End diff -- Do you want me to use the deprecated constructor or leave this as-is? ---
[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...
Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142801313 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala --- @@ -49,10 +50,51 @@ object Literal { case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME) case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP) } + + private[flink] def apply(rexNode: RexLiteral): Literal = { --- End diff -- I'll commit to using your standards for the code-base. However, allow me to voice a disagreement here: The Literal class does the conversion from the Literal back to the RexLiteral. Having this logic specifically in RexNodeToExpressionConverted means the RexLiteral-to-Literal logic is physically split from the Literal-to-RexLiteral logic. This makes it slightly easier for a contributor to make a change in one side of the conversion without accounting for the other. In particular, the date adjustments here become harder to understand since the context is split between two different files. ---
[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192087#comment-16192087 ] ASF GitHub Bot commented on FLINK-7657: --- Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142800631 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala --- @@ -200,10 +201,30 @@ case class BatchTableTestUtil() extends TableTestUtil { printTable(tableEnv.sqlQuery(query)) } + def verifyExpressionProjection(fields: Seq[(String, TypeInformation[_])], --- End diff -- I will move this. > SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException > -- > > Key: FLINK-7657 > URL: https://issues.apache.org/jira/browse/FLINK-7657 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.3.2 >Reporter: Kent Murra >Assignee: Kent Murra >Priority: Critical > > I have a SQL statement using the Tables API that has a timestamp in it. When > the execution environment tries to optimize the SQL, it causes an exception > (attached below). The result is any SQL query with a timestamp, date, or > time literal is unexecutable if any table source is marked with > FilterableTableSource. > {code:none} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule PushFilterIntoTableSourceScanRule, args > [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, > $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], > fields:(data, last_updated))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135) > at org.apache.flink.table.api.Table.writeToSink(table.scala:800) > at org.apache.flink.table.api.Table.writeToSink(table.scala:773) > at > com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27) > at > com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) > at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) > Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot > be cast to java.util.Date > at > org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107) > at > org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35) > at > org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92) > at >
[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192085#comment-16192085 ] ASF GitHub Bot commented on FLINK-7657: --- Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142800611 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CheckExpressionsTableSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource} +import org.apache.flink.types.Row + +import java.util +import java.util.Collections + +import scala.collection.JavaConverters._ + +/** + * A table source that takes in assertions and applies them when applyPredicate is called. + * Allows for testing that expression push downs are handled properly + * @param typeInfo The type info. + * @param assertions A set of assertions as a function reference + * @param pushedDown Whether this has been pushed down/ + */ +class CheckExpressionsTableSource(typeInfo: RowTypeInfo, --- End diff -- I'll look at doing that. It was my initial approach, but when I saw the potential set of test cases that would impact, I decided to do something more conservative. Still do-able. > SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException > -- > > Key: FLINK-7657 > URL: https://issues.apache.org/jira/browse/FLINK-7657 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.3.2 >Reporter: Kent Murra >Assignee: Kent Murra >Priority: Critical > > I have a SQL statement using the Tables API that has a timestamp in it. When > the execution environment tries to optimize the SQL, it causes an exception > (attached below). The result is any SQL query with a timestamp, date, or > time literal is unexecutable if any table source is marked with > FilterableTableSource. > {code:none} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule PushFilterIntoTableSourceScanRule, args > [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, > $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], > fields:(data, last_updated))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135) > at org.apache.flink.table.api.Table.writeToSink(table.scala:800) > at org.apache.flink.table.api.Table.writeToSink(table.scala:773) > at > com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27) > at > com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22) >
[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...
Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142800631 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala --- @@ -200,10 +201,30 @@ case class BatchTableTestUtil() extends TableTestUtil { printTable(tableEnv.sqlQuery(query)) } + def verifyExpressionProjection(fields: Seq[(String, TypeInformation[_])], --- End diff -- I will move this. ---
[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...
Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142800611 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CheckExpressionsTableSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource} +import org.apache.flink.types.Row + +import java.util +import java.util.Collections + +import scala.collection.JavaConverters._ + +/** + * A table source that takes in assertions and applies them when applyPredicate is called. + * Allows for testing that expression push downs are handled properly + * @param typeInfo The type info. + * @param assertions A set of assertions as a function reference + * @param pushedDown Whether this has been pushed down/ + */ +class CheckExpressionsTableSource(typeInfo: RowTypeInfo, --- End diff -- I'll look at doing that. It was my initial approach, but when I saw the potential set of test cases that would impact, I decided to do something more conservative. Still do-able. ---
[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...
Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142800400 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala --- @@ -103,13 +148,21 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre } } - private def dateToCalendar: Calendar = { + /** +* Convert a date value to a utc calendar. +* +* We're assuming that when the user passes in a Date its constructed from fields, +* such as days and hours, and that they want those fields to be in the same timezone as the +* Calcite times, which are UTC. Since we need to convert a Date to a Calendar, that means we +* have to shift the epoch millisecond timestamp to account for the difference between UTC and +* local time. +* @return Get the Calendar value +*/ + private def valueAsUtcCalendar: Calendar = { val date = value.asInstanceOf[java.util.Date] -val cal = Calendar.getInstance(Literal.GMT) -val t = date.getTime -// according to Calcite's SqlFunctions.internalToXXX methods -cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t)) --- End diff -- The re-implemented adjustCalendar method is functionally the same as this when toTz is the UTC TimeZone. Its just generalized to allow converting between arbitrary TimeZones so that I can re-use it in the RexLiteral to Expression conversion. ---
[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192084#comment-16192084 ] ASF GitHub Bot commented on FLINK-7657: --- Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142800400 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala --- @@ -103,13 +148,21 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre } } - private def dateToCalendar: Calendar = { + /** +* Convert a date value to a utc calendar. +* +* We're assuming that when the user passes in a Date its constructed from fields, +* such as days and hours, and that they want those fields to be in the same timezone as the +* Calcite times, which are UTC. Since we need to convert a Date to a Calendar, that means we +* have to shift the epoch millisecond timestamp to account for the difference between UTC and +* local time. +* @return Get the Calendar value +*/ + private def valueAsUtcCalendar: Calendar = { val date = value.asInstanceOf[java.util.Date] -val cal = Calendar.getInstance(Literal.GMT) -val t = date.getTime -// according to Calcite's SqlFunctions.internalToXXX methods -cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t)) --- End diff -- The re-implemented adjustCalendar method is functionally the same as this when toTz is the UTC TimeZone. Its just generalized to allow converting between arbitrary TimeZones so that I can re-use it in the RexLiteral to Expression conversion. > SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException > -- > > Key: FLINK-7657 > URL: https://issues.apache.org/jira/browse/FLINK-7657 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.3.2 >Reporter: Kent Murra >Assignee: Kent Murra >Priority: Critical > > I have a SQL statement using the Tables API that has a timestamp in it. When > the execution environment tries to optimize the SQL, it causes an exception > (attached below). The result is any SQL query with a timestamp, date, or > time literal is unexecutable if any table source is marked with > FilterableTableSource. > {code:none} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule PushFilterIntoTableSourceScanRule, args > [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, > $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], > fields:(data, last_updated))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135) > at org.apache.flink.table.api.Table.writeToSink(table.scala:800) > at org.apache.flink.table.api.Table.writeToSink(table.scala:773) > at > com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27) > at > com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) > at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) > Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot > be cast to java.util.Date > at > org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107) > at >
[jira] [Commented] (FLINK-6495) Migrate Akka configuration options
[ https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192078#comment-16192078 ] ASF GitHub Bot commented on FLINK-6495: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4774 Given that this caused instabilities shouldn't we introduce a runtime check to make sure these options are configure correctly in relation to each other? We should also properly document it in the javadocs that these values have a strong relationship. > Migrate Akka configuration options > -- > > Key: FLINK-6495 > URL: https://issues.apache.org/jira/browse/FLINK-6495 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Reporter: Chesnay Schepler >Assignee: Fang Yong > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4774: [FLINK-6495] Fix Akka's default value for heartbeat pause
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4774 Given that this caused instabilities shouldn't we introduce a runtime check to make sure these options are configure correctly in relation to each other? We should also properly document it in the javadocs that these values have a strong relationship. ---
[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192076#comment-16192076 ] ASF GitHub Bot commented on FLINK-7657: --- Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142799362 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala --- @@ -49,10 +50,51 @@ object Literal { case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME) case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP) } + + private[flink] def apply(rexNode: RexLiteral): Literal = { +val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType) + +val literalValue = literalType match { + // Chrono use cases. We're force-adjusting the UTC-based epoch timestamps to a new + // timestamp such that we get the same year/month/hour/day field values in the query's + // timezone (UTC) + case _...@sqltimetypeinfo.date => +val rexValue = rexNode.getValueAs(classOf[DateString]) +val adjustedCal = adjustCalendar(rexValue.toCalendar, TimeZone.getDefault) +new Date(adjustedCal.getTimeInMillis) --- End diff -- Unfortunately that constructor is deprecated in Java 8, which is why I avoided using it. > SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException > -- > > Key: FLINK-7657 > URL: https://issues.apache.org/jira/browse/FLINK-7657 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.3.2 >Reporter: Kent Murra >Assignee: Kent Murra >Priority: Critical > > I have a SQL statement using the Tables API that has a timestamp in it. When > the execution environment tries to optimize the SQL, it causes an exception > (attached below). The result is any SQL query with a timestamp, date, or > time literal is unexecutable if any table source is marked with > FilterableTableSource. > {code:none} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule PushFilterIntoTableSourceScanRule, args > [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, > $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], > fields:(data, last_updated))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135) > at org.apache.flink.table.api.Table.writeToSink(table.scala:800) > at org.apache.flink.table.api.Table.writeToSink(table.scala:773) > at > com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27) > at > com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) > at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) > Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot > be cast to java.util.Date > at > org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107) > at > org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at >
[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...
Github user kmurra commented on a diff in the pull request: https://github.com/apache/flink/pull/4746#discussion_r142799362 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala --- @@ -49,10 +50,51 @@ object Literal { case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME) case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP) } + + private[flink] def apply(rexNode: RexLiteral): Literal = { +val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType) + +val literalValue = literalType match { + // Chrono use cases. We're force-adjusting the UTC-based epoch timestamps to a new + // timestamp such that we get the same year/month/hour/day field values in the query's + // timezone (UTC) + case _...@sqltimetypeinfo.date => +val rexValue = rexNode.getValueAs(classOf[DateString]) +val adjustedCal = adjustCalendar(rexValue.toCalendar, TimeZone.getDefault) +new Date(adjustedCal.getTimeInMillis) --- End diff -- Unfortunately that constructor is deprecated in Java 8, which is why I avoided using it. ---
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192067#comment-16192067 ] ASF GitHub Bot commented on FLINK-7446: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4710 Thanks @wuchong I will merge this PR tomorrow. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4710: [FLINK-7446] [table] Change DefinedRowtimeAttribute to wo...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4710 Thanks @wuchong I will merge this PR tomorrow. ---
[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142787795 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; --- End diff -- why not using `HadoopFsFactory.class.getCanonicalName()`? ---
[jira] [Commented] (FLINK-7643) Configure FileSystems only once
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192019#comment-16192019 ] ASF GitHub Bot commented on FLINK-7643: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142787414 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A --- End diff -- A? > Configure FileSystems only once > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. > Instead, file systems should be configured once upon process startup, when > the configuration is loaded. > This will also increase efficiency of checkpoints, as it avoids redundant > parsing for each data chunk. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7643) Configure FileSystems only once
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192022#comment-16192022 ] ASF GitHub Bot commented on FLINK-7643: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142787795 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; --- End diff -- why not using `HadoopFsFactory.class.getCanonicalName()`? > Configure FileSystems only once > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. > Instead, file systems should be configured once upon process startup, when > the configuration is loaded. > This will also increase efficiency of checkpoints, as it avoids redundant > parsing for each data chunk. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7643) Configure FileSystems only once
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192018#comment-16192018 ] ASF GitHub Bot commented on FLINK-7643: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142788553 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; + + private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration"; + + private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem"; + + + public static FileSystemFactory loadFactory() { + final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader(); + + // first, see if the Flink runtime classes are available + final Class factoryClass; + try { + factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class); + } + catch (ClassNotFoundException e) { + return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies."); --- End diff -- why return an unsupported factory which will fail later, rather than fail early here in class loader? > Configure FileSystems only once > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. > Instead, file systems should be configured once upon process startup, when > the configuration is loaded. > This will also increase efficiency of checkpoints, as it avoids redundant > parsing for each data chunk. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7643) Configure FileSystems only once
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192021#comment-16192021 ] ASF GitHub Bot commented on FLINK-7643: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142789177 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/UnsupportedSchemeFactory.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.URI; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A file system factory to throw an UnsupportedFileSystemSchemeException when called. + */ +public class UnsupportedSchemeFactory implements FileSystemFactory { --- End diff -- I feel this is unnecessary. We can fail Flink faster and earlier, instead of having a wait until `create()` is called > Configure FileSystems only once > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. > Instead, file systems should be configured once upon process startup, when > the configuration is loaded. > This will also increase efficiency of checkpoints, as it avoids redundant > parsing for each data chunk. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142789177 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/UnsupportedSchemeFactory.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.URI; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A file system factory to throw an UnsupportedFileSystemSchemeException when called. + */ +public class UnsupportedSchemeFactory implements FileSystemFactory { --- End diff -- I feel this is unnecessary. We can fail Flink faster and earlier, instead of having a wait until `create()` is called ---
[jira] [Commented] (FLINK-7643) Configure FileSystems only once
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192020#comment-16192020 ] ASF GitHub Bot commented on FLINK-7643: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142788766 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/MapRFsFactory.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.URI; + +/** + * A factory for the MapR file system. + * + * This factory tries to reflectively instantiate the MapR file system. It can only be + * used when the MapR FS libraries are in the classpath. + */ +public class MapRFsFactory implements FileSystemFactory { + + private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem"; --- End diff -- ditto > Configure FileSystems only once > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. > Instead, file systems should be configured once upon process startup, when > the configuration is loaded. > This will also increase efficiency of checkpoints, as it avoids redundant > parsing for each data chunk. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142787414 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A --- End diff -- A? ---
[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142788553 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.core.fs.FileSystemFactory; + +/** + * A + */ +public class HadoopFileSystemFactoryLoader { + + private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory"; + + private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration"; + + private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem"; + + + public static FileSystemFactory loadFactory() { + final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader(); + + // first, see if the Flink runtime classes are available + final Class factoryClass; + try { + factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class); + } + catch (ClassNotFoundException e) { + return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies."); --- End diff -- why return an unsupported factory which will fail later, rather than fail early here in class loader? ---
[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4776#discussion_r142788766 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/factories/MapRFsFactory.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.factories; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.URI; + +/** + * A factory for the MapR file system. + * + * This factory tries to reflectively instantiate the MapR file system. It can only be + * used when the MapR FS libraries are in the classpath. + */ +public class MapRFsFactory implements FileSystemFactory { + + private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem"; --- End diff -- ditto ---
[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191968#comment-16191968 ] ASF GitHub Bot commented on FLINK-6094: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r142779879 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.WindowJoinUtil +import scala.collection.JavaConverters._ + +class DataStreamJoinRule + extends ConverterRule( +classOf[FlinkLogicalJoin], +FlinkConventions.LOGICAL, +FlinkConventions.DATASTREAM, +"DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] +val joinInfo = join.analyzeCondition + +val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate( + joinInfo.getRemaining(join.getCluster.getRexBuilder), + join.getLeft.getRowType.getFieldCount, + join.getRowType, + join.getCluster.getRexBuilder, + TableConfig.DEFAULT) + +// remaining predicate must not access time attributes +val remainingPredsAccessTime = remainingPreds.isDefined && + WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType) + +// Check that no event-time attributes are in the input. +val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala + .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +if (!windowBounds.isDefined && !remainingPredsAccessTime && !rowTimeAttrInOutput) { --- End diff -- @shaoxuan-wang, I thought about this issue again and think you are right. It would be quite difficult for users to get the queries right and also difficult to properly document the restrictions. IMO, it would be good to evolve the relational APIs such that most operators can be executed on time indicator attributes (event or proc time) or not. In case of time indicator attributes, we can generate more efficient plans with built-in state clean-up. A generic stream-stream join such as the one proposed in the PR would be a first step in this direction. As you said before, a major challenge with this approach would be to help users configuring state cleanup timers. I would propose to extend the EXPLAIN information with state size estimates. This would help users users to correctly set the query configuration. I will go over my comments for this PR again and adapt them where necessary. Thanks, Fabian > Implement stream-stream proctime non-window inner join > --- > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > This includes: > 1.Implement stream-stream proctime non-window inner join > 2.Implement the retract process logic for join -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r142779879 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.WindowJoinUtil +import scala.collection.JavaConverters._ + +class DataStreamJoinRule + extends ConverterRule( +classOf[FlinkLogicalJoin], +FlinkConventions.LOGICAL, +FlinkConventions.DATASTREAM, +"DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] +val joinInfo = join.analyzeCondition + +val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate( + joinInfo.getRemaining(join.getCluster.getRexBuilder), + join.getLeft.getRowType.getFieldCount, + join.getRowType, + join.getCluster.getRexBuilder, + TableConfig.DEFAULT) + +// remaining predicate must not access time attributes +val remainingPredsAccessTime = remainingPreds.isDefined && + WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType) + +// Check that no event-time attributes are in the input. +val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala + .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +if (!windowBounds.isDefined && !remainingPredsAccessTime && !rowTimeAttrInOutput) { --- End diff -- @shaoxuan-wang, I thought about this issue again and think you are right. It would be quite difficult for users to get the queries right and also difficult to properly document the restrictions. IMO, it would be good to evolve the relational APIs such that most operators can be executed on time indicator attributes (event or proc time) or not. In case of time indicator attributes, we can generate more efficient plans with built-in state clean-up. A generic stream-stream join such as the one proposed in the PR would be a first step in this direction. As you said before, a major challenge with this approach would be to help users configuring state cleanup timers. I would propose to extend the EXPLAIN information with state size estimates. This would help users users to correctly set the query configuration. I will go over my comments for this PR again and adapt them where necessary. Thanks, Fabian ---
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191914#comment-16191914 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142762460 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191911#comment-16191911 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142705185 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191896#comment-16191896 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142692599 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, --- End diff -- The time indicies are only needed by `RowTimeBoundedStreamInnerJoin`. They can be removed here. > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + >
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191908#comment-16191908 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142769397 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -102,5 +117,154 @@ class JoinITCase extends StreamingWithStateTestBase { env.execute() } + /** test rowtime inner join **/ + @Test + def testRowTimeInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 join T2 as t2 ON +| t1.a = t2.a AND +| t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND +|t2.rt + INTERVAL '6' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(Int, Long, String, Long)] --- End diff -- Add two rows with null keys on both sides within join window boundaries to test that join predicates on null values are not evaluated to true. For this to work we need to also fix the `keyBy()` calls to support partitioning of null keys (see #4732) > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191907#comment-16191907 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142706610 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191901#comment-16191901 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142691841 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +final class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx) { + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +leftOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() + else 0L +rightOperatorTime = --- End diff -- just use `leftOperatorTime` to avoid the additional method calls and condition? > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191906#comment-16191906 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142694926 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191903#comment-16191903 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142697408 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191899#comment-16191899 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142681890 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -152,19 +176,40 @@ class DataStreamWindowJoin( } } - def createProcTimeInnerJoinFunction( + def createEmptyInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow]) = { +leftDataStream.connect(rightDataStream).process( + new CoProcessFunction[CRow, CRow, CRow] { +override def processElement1( --- End diff -- add `Unit` return types for both `processElement` methods. > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191894#comment-16191894 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142681690 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -152,19 +176,40 @@ class DataStreamWindowJoin( } } - def createProcTimeInnerJoinFunction( + def createEmptyInnerJoin( --- End diff -- please add the return type for the method `DataStream[CRow]` > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191910#comment-16191910 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142762915 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191912#comment-16191912 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142714462 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191900#comment-16191900 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142699858 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191893#comment-16191893 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142679463 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala --- @@ -105,6 +104,8 @@ class DataStreamWindowJoinRule windowBounds.get.isEventTime, windowBounds.get.leftLowerBound, windowBounds.get.leftUpperBound, + windowBounds.get.leftTimeIdx, + windowBounds.get.rightTimeIdx, remainCondition, --- End diff -- The `remainCondition` must include the equi-join predicates to ensure that the join condition is correctly evaluated for `null` values (see FLINK-7755 for details). To solve this, I'd suggest to call `WindowJoinUtil.extractWindowBoundsFromPredicate` with `join.getCondition` instead of `joinInfo.getRemaining(join.getCluster.getRexBuilder)`. > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142762761 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142762460 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191909#comment-16191909 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142710061 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191895#comment-16191895 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142682283 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -152,19 +176,40 @@ class DataStreamWindowJoin( } } - def createProcTimeInnerJoinFunction( + def createEmptyInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow]) = { +leftDataStream.connect(rightDataStream).process( + new CoProcessFunction[CRow, CRow, CRow] { +override def processElement1( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]) = { + //Do nothing. +} +override def processElement2( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]) = { + //Do nothing. +} + }) --- End diff -- add a `returns(returnTypeInfo)` call to ensure we use the right type. > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142699858 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191897#comment-16191897 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142685896 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + +val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) --- End diff -- we need to make sure to include the fixes of #4732 > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191902#comment-16191902 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142688996 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + +val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) +.transform( + "InnerRowtimeWindowJoin", + returnTypeInfo, + new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, CRow]( --- End diff -- In the current implementation the `KEY` type would be a `Tuple`, but I think we can just pass `_` here. When we adopt #4732, the key will be `Row`. > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191905#comment-16191905 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142705409 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191913#comment-16191913 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142762761 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191904#comment-16191904 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142703252 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; --- End diff -- rm `;` > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191898#comment-16191898 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142689196 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + +val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) +.transform( + "InnerRowtimeWindowJoin", + returnTypeInfo, + new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, CRow]( +rowTimeInnerJoinFunc, +rowTimeInnerJoinFunc.getMaxOutputDelay) +) +} else { + leftDataStream.connect(rightDataStream) +.keyBy(new NullByteKeySelector[CRow](), new NullByteKeySelector[CRow]) +.transform( + "InnerRowtimeWindowJoin", + returnTypeInfo, + new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, CRow]( --- End diff -- `KEY` type is `Byte` instead of `CRow` > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142710061 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142703252 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; --- End diff -- rm `;` ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142691841 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +final class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx) { + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +leftOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() + else 0L +rightOperatorTime = --- End diff -- just use `leftOperatorTime` to avoid the additional method calls and condition? ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142685896 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + +val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) --- End diff -- we need to make sure to include the fixes of #4732 ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142705409 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142714462 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142762915 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142706610 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142694926 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142697408 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142705185 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142689196 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + +val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) +.transform( + "InnerRowtimeWindowJoin", + returnTypeInfo, + new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, CRow]( +rowTimeInnerJoinFunc, +rowTimeInnerJoinFunc.getMaxOutputDelay) +) +} else { + leftDataStream.connect(rightDataStream) +.keyBy(new NullByteKeySelector[CRow](), new NullByteKeySelector[CRow]) +.transform( + "InnerRowtimeWindowJoin", + returnTypeInfo, + new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, CRow]( --- End diff -- `KEY` type is `Byte` instead of `CRow` ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142682283 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -152,19 +176,40 @@ class DataStreamWindowJoin( } } - def createProcTimeInnerJoinFunction( + def createEmptyInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow]) = { +leftDataStream.connect(rightDataStream).process( + new CoProcessFunction[CRow, CRow, CRow] { +override def processElement1( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]) = { + //Do nothing. +} +override def processElement2( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]) = { + //Do nothing. +} + }) --- End diff -- add a `returns(returnTypeInfo)` call to ensure we use the right type. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142769397 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -102,5 +117,154 @@ class JoinITCase extends StreamingWithStateTestBase { env.execute() } + /** test rowtime inner join **/ + @Test + def testRowTimeInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 join T2 as t2 ON +| t1.a = t2.a AND +| t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND +|t2.rt + INTERVAL '6' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(Int, Long, String, Long)] --- End diff -- Add two rows with null keys on both sides within join window boundaries to test that join predicates on null values are not evaluated to true. For this to work we need to also fix the `keyBy()` calls to support partitioning of null keys (see #4732) ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142681690 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -152,19 +176,40 @@ class DataStreamWindowJoin( } } - def createProcTimeInnerJoinFunction( + def createEmptyInnerJoin( --- End diff -- please add the return type for the method `DataStream[CRow]` ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142681890 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -152,19 +176,40 @@ class DataStreamWindowJoin( } } - def createProcTimeInnerJoinFunction( + def createEmptyInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow]) = { +leftDataStream.connect(rightDataStream).process( + new CoProcessFunction[CRow, CRow, CRow] { +override def processElement1( --- End diff -- add `Unit` return types for both `processElement` methods. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142679463 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala --- @@ -105,6 +104,8 @@ class DataStreamWindowJoinRule windowBounds.get.isEventTime, windowBounds.get.leftLowerBound, windowBounds.get.leftUpperBound, + windowBounds.get.leftTimeIdx, + windowBounds.get.rightTimeIdx, remainCondition, --- End diff -- The `remainCondition` must include the equi-join predicates to ensure that the join condition is correctly evaluated for `null` values (see FLINK-7755 for details). To solve this, I'd suggest to call `WindowJoinUtil.extractWindowBoundsFromPredicate` with `join.getCondition` instead of `joinInfo.getRemaining(join.getCluster.getRexBuilder)`. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142688996 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + +val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + +if (!leftKeys.isEmpty) { + leftDataStream +.connect(rightDataStream) +.keyBy(leftKeys, rightKeys) +.transform( + "InnerRowtimeWindowJoin", + returnTypeInfo, + new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, CRow]( --- End diff -- In the current implementation the `KEY` type would be a `Tuple`, but I think we can just pass `_` here. When we adopt #4732, the key will be `Row`. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142692599 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, --- End diff -- The time indicies are only needed by `RowTimeBoundedStreamInnerJoin`. They can be removed here. ---
[jira] [Commented] (FLINK-7643) Configure FileSystems only once
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191887#comment-16191887 ] ASF GitHub Bot commented on FLINK-7643: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4776 [FLINK-7643] [core] Rework FileSystem loading to use factories ## What is the purpose of the change This change reworks the loading and instantiation of File System objects (including file systems supported via Hadoop) to use factories. This makes sure that configurations (Flink and possibly Hadoop) are loaded once (on TaskManager / JobManager startup) and file system instances are properly reused by scheme and authority. That way, this change This change is also a prerequisite for an extensible file system loading mechanism via a service framework. ## Brief change log - The special-case configuration of the `FileSystem` class to set the "default file system scheme" is extended to a generic configuration call. - The directory of directly supported file systems is changed from classes (instantiated via reflection) to factories. - These factories are also configured when the `FileSystem` is configured. - The Hadoop file system factory loads the Hadoop configuration once when being configured and applies it to all subsequently instantiated file systems. - File systems supported via Hadoop are now properly cached and not reloaded, reinstantiated, and reconfigured on each access. - This also throws out a lot of legacy code for how to find Hadoop file system implementations - The `FileSystem` class is much cleaner now because a lot of the Hadoop FS - All file systems now eagerly initialize their settings, rather than dividing that between the constructor and the `initialize()` method. - This also factors out a lot of the special treatment of Hadoop file systems and simply makes the Hadoop File System factory the default fallback factory. ## Verifying this change Reworked some tests to cover the behavior of this change: - `flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java` - `flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) *Note:* The breaking changes made on `@Public` class `FileSystem` do not include methods that are meant for users, but only the setup configuration. ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink fs_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4776.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4776 commit ba312e137c7af1d2c331c5231b5b0ae3e0401549 Author: Stephan EwenDate: 2017-10-02T12:34:27Z [FLINK-7643] [core] Misc. cleanups in FileSystem - Simplify access to local file system - Use a fair lock for all FileSystem.get() operations - Robust falback to local fs for default scheme (avoids URI parsing error on Windows) - Deprecate 'getDefaultBlockSize()' - Deprecate create(...) with block sizes and replication factor, which is not applicable to many FS commit 8130d874b8b823f22964f435bf1a1d1bd39774d6 Author: Stephan Ewen Date: 2017-10-02T14:25:18Z [FLINK-7643] [core] Rework FileSystem loading to use factories This makes sure that configurations are loaded once and file system instances are properly reused by scheme and authority. This also factors out a lot of the special treatment of Hadoop file systems and simply makes the Hadoop File System factory the default fallback factory. commit c652f1322044f9715a0d94fa21ec853769be9a78 Author: Stephan Ewen Date: 2017-10-02T14:30:07Z [FLINK-7643] [core] Drop eager checks for file system support. Some
[GitHub] flink pull request #4776: [FLINK-7643] [core] Rework FileSystem loading to u...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4776 [FLINK-7643] [core] Rework FileSystem loading to use factories ## What is the purpose of the change This change reworks the loading and instantiation of File System objects (including file systems supported via Hadoop) to use factories. This makes sure that configurations (Flink and possibly Hadoop) are loaded once (on TaskManager / JobManager startup) and file system instances are properly reused by scheme and authority. That way, this change This change is also a prerequisite for an extensible file system loading mechanism via a service framework. ## Brief change log - The special-case configuration of the `FileSystem` class to set the "default file system scheme" is extended to a generic configuration call. - The directory of directly supported file systems is changed from classes (instantiated via reflection) to factories. - These factories are also configured when the `FileSystem` is configured. - The Hadoop file system factory loads the Hadoop configuration once when being configured and applies it to all subsequently instantiated file systems. - File systems supported via Hadoop are now properly cached and not reloaded, reinstantiated, and reconfigured on each access. - This also throws out a lot of legacy code for how to find Hadoop file system implementations - The `FileSystem` class is much cleaner now because a lot of the Hadoop FS - All file systems now eagerly initialize their settings, rather than dividing that between the constructor and the `initialize()` method. - This also factors out a lot of the special treatment of Hadoop file systems and simply makes the Hadoop File System factory the default fallback factory. ## Verifying this change Reworked some tests to cover the behavior of this change: - `flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java` - `flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) *Note:* The breaking changes made on `@Public` class `FileSystem` do not include methods that are meant for users, but only the setup configuration. ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink fs_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4776.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4776 commit ba312e137c7af1d2c331c5231b5b0ae3e0401549 Author: Stephan EwenDate: 2017-10-02T12:34:27Z [FLINK-7643] [core] Misc. cleanups in FileSystem - Simplify access to local file system - Use a fair lock for all FileSystem.get() operations - Robust falback to local fs for default scheme (avoids URI parsing error on Windows) - Deprecate 'getDefaultBlockSize()' - Deprecate create(...) with block sizes and replication factor, which is not applicable to many FS commit 8130d874b8b823f22964f435bf1a1d1bd39774d6 Author: Stephan Ewen Date: 2017-10-02T14:25:18Z [FLINK-7643] [core] Rework FileSystem loading to use factories This makes sure that configurations are loaded once and file system instances are properly reused by scheme and authority. This also factors out a lot of the special treatment of Hadoop file systems and simply makes the Hadoop File System factory the default fallback factory. commit c652f1322044f9715a0d94fa21ec853769be9a78 Author: Stephan Ewen Date: 2017-10-02T14:30:07Z [FLINK-7643] [core] Drop eager checks for file system support. Some places validate if the file URIs are resolvable on the client. This leads to problems when file systems are not accessible from the client, when the full libraries for the file systems are not present on the client (for example often
[GitHub] flink pull request #4771: [hotfix] [hbase] Set root log level to OFF for fli...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4771 ---
[jira] [Commented] (FLINK-7687) Clarify the master and slaves files are not necessary unless using the cluster start/stop scripts
[ https://issues.apache.org/jira/browse/FLINK-7687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191552#comment-16191552 ] Elias Levy commented on FLINK-7687: --- And? > Clarify the master and slaves files are not necessary unless using the > cluster start/stop scripts > - > > Key: FLINK-7687 > URL: https://issues.apache.org/jira/browse/FLINK-7687 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Elias Levy >Priority: Minor > > It would be helpful if the documentation was clearer on the fact that the > master/slaves config files are not needed when configured in > high-availability mode unless you are using the provided scripts to start and > shutdown the cluster over SSH. If you are using some other mechanism to > manage Flink instances (configuration management tools such as Chef or > Ansible, or container management frameworks like Docker Compose or > Kubernetes), these files are unnecessary. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4775: [FLINK-7739] Fix KafkaXXITCase tests stability
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4775 [FLINK-7739] Fix KafkaXXITCase tests stability ## What is the purpose of the change This change fixes Kafka*ITCase tests stability. Main fix is excluding `netty` dependency from zookeeper. Other two are probably just cosmetic changes. For more info please look into individual commit messages. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink kafka-test2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4775.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4775 commit c7cc24d062aa233d86b68b7438c9a4e717003393 Author: Piotr NowojskiDate: 2017-09-29T16:23:29Z [FLINK-7739][kafka-tests] Set shorter heartbeats intervals Default pause value of 60seconds is too large (tests would timeout before akka react) commit 1677791f10153b9f7ecd552eac148d6ae3d056f1 Author: Piotr Nowojski Date: 2017-10-04T11:48:11Z [FLINK-7739][kafka-tests] Set restart delay to non zero Give TaskManagers some time to clean up before restaring a job. commit 937c3fb388d9d7104b6336f59c3674bb70bfbf50 Author: Piotr Nowojski Date: 2017-10-04T14:50:57Z [FLINK-7739] Exclude netty dependency from zookeeper Zookeeper was pulling in conflicting Netty version. Conflict was extremly subtle - TaskManager in kafka tests was deadlocking in some rare corner cases. ---
[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability
[ https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191517#comment-16191517 ] ASF GitHub Bot commented on FLINK-7739: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4775 [FLINK-7739] Fix KafkaXXITCase tests stability ## What is the purpose of the change This change fixes Kafka*ITCase tests stability. Main fix is excluding `netty` dependency from zookeeper. Other two are probably just cosmetic changes. For more info please look into individual commit messages. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink kafka-test2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4775.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4775 commit c7cc24d062aa233d86b68b7438c9a4e717003393 Author: Piotr NowojskiDate: 2017-09-29T16:23:29Z [FLINK-7739][kafka-tests] Set shorter heartbeats intervals Default pause value of 60seconds is too large (tests would timeout before akka react) commit 1677791f10153b9f7ecd552eac148d6ae3d056f1 Author: Piotr Nowojski Date: 2017-10-04T11:48:11Z [FLINK-7739][kafka-tests] Set restart delay to non zero Give TaskManagers some time to clean up before restaring a job. commit 937c3fb388d9d7104b6336f59c3674bb70bfbf50 Author: Piotr Nowojski Date: 2017-10-04T14:50:57Z [FLINK-7739] Exclude netty dependency from zookeeper Zookeeper was pulling in conflicting Netty version. Conflict was extremly subtle - TaskManager in kafka tests was deadlocking in some rare corner cases. > Improve Kafka*ITCase tests stability > > > Key: FLINK-7739 > URL: https://issues.apache.org/jira/browse/FLINK-7739 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > -- This message was sent by Atlassian JIRA (v6.4.14#64029)